jason810496 opened a new pull request, #67318: URL: https://github.com/apache/airflow/pull/67318
# go-sdk: Implement coordinator-mode runtime entry point and task runner - **Depends on https://github.com/apache/airflow/pull/TBD get merged first** (refactor/go-sdk/coordinator-comms) - **Diff for early review**: https://github.com/jason810496/airflow/compare/refactor/go-sdk/coordinator-comms...jason810496:refactor/go-sdk/coordinator-runtime-server - related: split out of https://github.com/apache/airflow/pull/67154 to land it in three reviewable PRs (protocol primitives -> comms layer -> runtime entry point). ## Why Third and final PR in the stack carved out of #67154. With the protocol primitives merged in PR1 and the dispatcher / logger / client merged in PR2, this PR wires the entry point: the same bundle binary that today serves go-plugin can now also be launched directly by the Python supervisor, dial the supervisor's comm and logs sockets, and run a single TaskInstance. Coordinator mode is the path that lets the Python supervisor schedule Go tasks without standing up a separate worker process -- it launches the bundle binary as a child, hands it two socket addresses on the CLI, and talks the msgpack-over-IPC protocol directly -- so a Go-task DagRun looks operationally indistinguishable from a Python-task DagRun on the supervisor side. This is the smallest PR in the stack (~650 LOC) because all the heavy lifting -- frame I/O, dispatcher, slog handler, `sdk.Client` re-implementation -- already landed in PR1 and PR2. Dag-file parsing over the coordinator protocol is intentionally not part of this stack and will land in a follow-up once that protocol settles. ## How - `pkg/execution/server.go` -- `execution.Serve(bundle, commAddr, logsAddr)` dials both supervisor sockets, defers a `Close` on each, installs `SocketLogHandler` as the slog default before any user code runs, constructs a `CoordinatorComm` over the comm socket, reads the initial `StartupDetails`, and dispatches to `task_runner.Run`. If `Serve` itself errors before the dispatcher spins up, the deferred close still releases the dialed sockets so the supervisor doesn't see a stuck child. - `pkg/execution/task_runner.go` -- runs a single task. Builds a context carrying the `CoordinatorClient` under `sdkcontext.SdkClientContextKey` (PR1 added the injection site in `bundlev1.taskFunction.Execute`), invokes `bundle.LookupTask(dag, task).Execute`, and sends the resulting `TaskStateMsg` back through the dispatcher. Terminal-state delivery is `ctx.Err()`-aware so a cancelled supervisor doesn't leave the runtime blocked on a send. - `pkg/execution/integration_test.go` -- end-to-end test that pipes a fake supervisor against the real `Serve` over an in-memory socket pair, exercises GetVariable / XCom push / deferral, and asserts the emitted `TaskStateMsg`. - `bundle/bundlev1/bundlev1server/server.go` -- splits `Serve` into a `decideMode` switch over `(--bundle-metadata | --comm/--logs | <none>)` so the same binary still serves go-plugin when no coordinator flags are present. Partial use of `--comm` / `--logs` is a hard error (`ErrCoordinatorFlagsIncomplete`), returned to `main` so the caller exits non-zero with usage rather than silently falling back to go-plugin. - `example/bundle/main.go` -- propagates `bundlev1server.Serve`'s error via `log.Fatal`, and tightens the example connection-log to log only non-sensitive fields, matching the masker TODOs PR1 added on `sdk.Client.GetConnection`. ## What - Add `go-sdk/pkg/execution/{server,task_runner,integration_test}.go`. - Extend `go-sdk/bundle/bundlev1/bundlev1server/server.go` with coordinator-mode dispatch and `ErrCoordinatorFlagsIncomplete`. - Update `go-sdk/example/bundle/main.go` to propagate `Serve`'s error and redact the connection log. ## Next - (none -- last PR of the stack) --- ##### Was generative AI tooling used to co-author this PR? - [x] Yes, with help of Claude Code Opus 4.7 following [the guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
