jason810496 commented on code in PR #65956: URL: https://github.com/apache/airflow/pull/65956#discussion_r3199570199
########## java-sdk/adr/0003-workload-execution.md: ########## @@ -0,0 +1,473 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + +# ADR-0003: Workload Execution — Language-Specific Task Execution + +## Status + +Accepted + +## Context + +Airflow's standard task runner executes Python callables. To support tasks written in other languages, the pipeline needs an extension point where a language-specific coordinator can intercept the execution, delegate to an external runtime process, and bridge the Task SDK protocol so the external process can access Airflow services (connections, variables, XCom) during execution. + +This ADR details the task execution side of the coordinator architecture described in [ADR-0001](0001-java-sdk-airflow-integration.md). It starts with the generic model — the abstract contracts and expected behavior that any language must implement — then walks through Java as a concrete example. + +## Decision + +### Extension Point: `BaseCoordinator` + +The same `BaseCoordinator` base class that handles DAG parsing also handles task execution. It is registered in `provider.yaml` under `coordinators`. For task execution, a subclass must implement: + +| Method | Signature | Responsibility | +|---|---|---| +| `task_execution_cmd` | `(what, dag_rel_path, bundle_info, comm_addr, logs_addr) -> list[str]` | Return the full command to launch the language runtime for task execution. `comm_addr` and `logs_addr` are `host:port` strings the process must connect to. | + +The base class provides `run_task_execution()` as a concrete method that handles all TCP/process plumbing automatically (same pattern as `run_dag_parsing()` for the DAG parsing side). + +**Parameters passed to `run_task_execution()`:** + +| Parameter | Type | Description | +|---|---|---| +| `what` | `TaskInstance` | The task instance to execute (id, dag_id, task_id, run_id, try_number, etc.) | +| `dag_rel_path` | `str \| PathLike` | Relative path to the DAG file / bundle within the bundle root | +| `bundle_info` | `BundleInfo` | Bundle name and version | +| `startup_details` | `StartupDetails` | Full startup context (task instance, DAG rel path, bundle info, run context, start date) — already consumed from fd 0 | + +### Registration + +The same `coordinators` entry in `provider.yaml` covers both DAG parsing and task execution — no separate registration needed: + +```yaml +coordinators: + - airflow.providers.sdk.<lang>.coordinator.<LangCoordinator> +``` + +### Discovery: `_resolve_runtime_entrypoint()` + +When `task_runner.main()` starts, before any Python task execution: + +``` +task_runner.main() + → startup_details = get_startup_details() # reads from fd 0 + → _resolve_runtime_entrypoint(startup_details) + for each class_path in ProvidersManagerTaskRuntime().process_coordinators: + coordinator_cls = import_string(class_path) + if not hasattr(coordinator_cls, "run_task_execution"): + continue + return functools.partial(coordinator_cls.run_task_execution, + what=..., dag_rel_path=..., bundle_info=..., startup_details=...) + return None # fall back to default Python execution + + → if runtime_entrypoint is not None: + runtime_entrypoint() # language-specific execution + return # short-circuit — skip Python execution entirely +``` + +> **Note:** Currently the first coordinator with `run_task_execution` wins. `QueueToCoordinatorMapper` maps the task's `queue` to the correct coordinator via the `[sdk] queue_to_sdk` configuration. + +### Expected E2E Flow + +``` +Airflow Executor (dispatches task) + │ + ▼ +WatchedSubprocess.start(target=task_runner.main) + │ + [fork — child process gets fd 0 as Unix domain socket to supervisor] + │ + ▼ (in child) +task_runner.main() + │ + ├─ get_startup_details() ← reads StartupDetails from fd 0 + │ + ├─ _resolve_runtime_entrypoint() + │ └─ iterates coordinators from provider.yaml + │ └─ first with run_task_execution wins + │ + ▼ +<Lang>Coordinator.run_task_execution(what, dag_rel_path, bundle_info, startup_details) + │ + ▼ +BaseCoordinator._runtime_subprocess_entrypoint(TaskExecutionInfo) + │ + ├─ 1. Create TCP comm_server + logs_server on 127.0.0.1:random + ├─ 2. Create stderr socketpair + ├─ 3. Call task_execution_cmd() → get launch command + ├─ 4. Popen(cmd, stdin=DEVNULL, stderr=child_stderr) + ├─ 5. Accept TCP connections from the language runtime + ├─ 6. _send_startup_details(runtime_comm, startup_details) + │ └─ re-serializes with model_dump(mode="json") to avoid + │ msgpack extension types non-Python decoders can't handle + ├─ 7. supervisor_comm = socket(fileno=os.dup(0)) + └─ 8. _bridge() — raw byte forwarding until process exits +``` + +Key difference from DAG parsing: In task execution, `task_runner.main()` has already consumed `StartupDetails` from fd 0. The bridge must re-send `StartupDetails` to the language runtime over TCP before starting the byte-forwarding bridge. This is done via `_send_startup_details()`, which re-serializes using JSON mode to avoid msgpack extension types (like `Timestamp`) that non-Python decoders may not support. + +### Expected Message Sequence + +Task execution is a multi-round conversation, unlike DAG parsing's single request/response: + +``` +Airflow Supervisor Bridge Language Runtime + │ │ │ + │ [StartupDetails sent by bridge directly] │ + │ ├── StartupDetails ────►│ + │ │ │ + │ │ ├── Look up task + │ │ │ from bundle + │ │ │ + │ │ ┌───────────────────┤ + │ │ │ Task code runs │ + │ │ │ and may request: │ + │ │ │ │ + │◄── GetConnection(conn_id) ──────┼───┤ │ + │ │ │ │ + ├── ConnectionResult ─────────────┼──►│ │ + │ │ │ │ + │◄── GetVariable(key) ────────────┼───┤ │ + │ │ │ │ + ├── VariableResult ───────────────┼──►│ │ + │ │ │ │ + │◄── GetXCom(key, dag_id, ...) ───┼───┤ │ + │ │ │ │ + ├── XComResult ───────────────────┼──►│ │ + │ │ │ │ + │◄── SetXCom(key, value, ...) ────┼───┤ │ + │ │ │ │ + ├── (empty response) ─────────────┼──►│ │ + │ │ │ │ + │ │ └───────────────────┤ + │ │ │ + │◄── SucceedTask / TaskState ─────┼───────────────────────┤ + │ (terminal — no response) │ │ + │ │ └── exit(0) + │ │ + │ └── drain, close sockets +``` + +### Task SDK Protocol Messages + +The language runtime exchanges these message types with the Airflow supervisor: + +**Runtime → Supervisor (requests):** + +| Message | Fields | Purpose | +|---|---|---| +| `GetConnection` | `conn_id` | Fetch an Airflow connection by ID | Review Comment: Handled both in https://github.com/apache/airflow/pull/65956/commits/584e0b1fdce951834e008969ce2533dd4d4249cc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
