ashb commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2993795009


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2120,3 +2120,99 @@ def supervise(
         if close_client and client:
             with suppress(Exception):
                 client.close()
+
+
+def supervise_workload(
+    workload: ExecutorWorkload,
+    *,
+    server: str | None = None,
+    dry_run: bool = False,
+    client: Client | None = None,
+    subprocess_logs_to_stdout: bool = False,
+    proctitle: str | None = None,
+) -> int:
+    """
+    Run any workload type to completion in a supervised subprocess.
+
+    Dispatch to the appropriate supervisor based on workload type. 
Workload-specific
+    attributes (log_path, sentry_integration, bundle_info, etc.) are read from 
the
+    workload object itself.
+
+    :param workload: The ``ExecutorWorkload`` to execute.
+    :param server: Base URL of the API server (used by task workloads).
+    :param dry_run: If True, execute without actual task execution (simulate 
run).
+    :param client: Optional preconfigured client for communication with the 
server.
+    :param subprocess_logs_to_stdout: Should task logs also be sent to stdout 
via the main logger.
+    :param proctitle: Process title to set for this workload. If not provided, 
defaults to
+        ``"airflow supervisor: <workload.display_name>"``. Executors may pass 
a custom title
+        that includes executor-specific context (e.g. team name).
+    :return: Exit code of the process.
+    """
+    # Imports deferred to avoid an SDK/core dependency at module load time.
+    from airflow.executors.workloads.callback import ExecuteCallback
+    from airflow.executors.workloads.task import ExecuteTask

Review Comment:
   Argh. This has just made me realise something.
   
   Previously, the `supervise` function didn't take a workload object as a 
deliberate design decision: The definition of the workload structure must be in 
sync between the Executor and the worker code that picks up tasks (Celery 
worker, edge worker etc) but _specifically_ Not the Task SDK.
   
   One of the design goals with the SDK is that v 1.0.0 of the SDK must be 
"wire compatible" with future versions of airflow core. Or to put it another 
way, I should be able to continue to run a worker on Airflow Task SDK 1.0.0 
with Airflow 3.2.0 API server + scheduler without changing a single thing about 
the worker deployment/pod/venv etc.
   
   By having workload definitions used here this rule could very easily be 
broken in a point release, defeating one of the key goals of the Core/TaskSDK 
separation.
   
   I'm wondering if instead this generic supervise function should live in 
BaseExecutor? It's not really right either, but this is at least already in the 
shared path between Executor/scheduler and the code that runs on the worker (At 
various points in the past year we have talked about having a new 
`apache-airflow-base-executor` dist). 
   
   I'm really sorry I missed this on the earlier review. That's on me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to