jason810496 opened a new pull request, #67317: URL: https://github.com/apache/airflow/pull/67317
# go-sdk: Add concurrent-safe coordinator comms, log handler, and client - **Depends on https://github.com/apache/airflow/pull/TBD get merged first** (refactor/go-sdk/coordinator-protocol) - **Diff for early review**: https://github.com/jason810496/airflow/compare/refactor/go-sdk/coordinator-protocol...jason810496:refactor/go-sdk/coordinator-comms - related: split out of https://github.com/apache/airflow/pull/67154 to land it in three reviewable PRs (protocol primitives -> comms layer -> runtime entry point). ## Why Second PR in a 3-PR stack carved out of #67154. The coordinator protocol is fully duplex -- the supervisor can send a `StartupDetails` or an inbound XCom value at any time, and the runtime can send a `GetVariable` request and await a matching reply at any time -- so a naive read-one / write-one loop would either serialise everything (no concurrent task code) or require every caller to multiplex frames by hand. `CoordinatorComm` solves this once: a single dispatcher goroutine demuxes inbound frames to per-request reply channels keyed by a monotonic id, propagates `ctx.Err()` to outstanding requests on cancellation, and tears down pending requests cleanly on `SendRequest` failure. `SocketLogHandler` exists so the supervisor demuxes task logs the same way it demuxes everything else -- structured JSON over the dedicated logs socket -- instead of having to parse stderr. `CoordinatorClient` lets the task runner in the next PR reuse the existing `sdk.Client` API by re-implementing it on top of the dispatcher, so tasks written against `sdk.Client` run unchanged under coordinator mode. The comm-layer dispatcher pattern is inspired by https://github.com/apache/airflow/pull/66412. Still no entry point in this PR -- that arrives in the third PR of the stack -- so this PR has no user-visible effect on its own. ## How - `pkg/execution/comms.go` -- `CoordinatorComm` owns the inbound read loop, an `atomic.Int64` request-id counter (wide enough to avoid wraparound on a long-running runtime), and a `map[int64]chan reply` guarded by a mutex. `SendRequest` registers the reply channel, writes the frame, then either receives the reply or `ctx.Err()` -- whichever happens first cleans up the channel. The dispatcher exits when the inbound stream returns `io.EOF` and fails any still-pending requests with the connection error. - `pkg/execution/logger.go` -- `SocketLogHandler` implements `slog.Handler`, resolves `slog.Value`s before JSON marshaling so lazy / `LogValuer` values render correctly, and writes one JSON line per record onto the logs socket. Survives a writer that goes away by swallowing the error rather than panicking -- log loss is preferable to task failure here. - `pkg/execution/client.go` -- `CoordinatorClient` implements `sdk.Client` by serialising each call into the matching `pkg/execution/messages.go` envelope, routing it through `CoordinatorComm.SendRequest`, and decoding the reply. `GetVariable` honours `AIRFLOW_VAR_*` environment overrides before hitting the supervisor (matching Python's `airflow.models.variable.Variable.get` order). `GetConnection` and `GetVariable` translate supervisor "not found" replies into the SDK's sentinel errors (`sdk.ErrConnectionNotFound`, `sdk.ErrVariableNotFound`) so callers can `errors.Is` on them. - Tests cover the concurrent paths explicitly: race-free dispatch under N concurrent requests, cleanup on `SendRequest`-with-cancelled-context, EOF teardown, and round-tripping every message variant through the client. ## What - Add `go-sdk/pkg/execution/{comms,logger,client}.go` plus matching unit tests. ## Next - refactor/go-sdk/coordinator-runtime-server (PR #TBD, opened after this one merges) --- ##### Was generative AI tooling used to co-author this PR? - [x] Yes, with help of Claude Code Opus 4.7 following [the guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
