Copilot commented on code in PR #68221:
URL: https://github.com/apache/airflow/pull/68221#discussion_r3372758169
##########
go-sdk/sdk/sdk.go:
##########
@@ -23,6 +23,12 @@ import (
"github.com/apache/airflow/go-sdk/pkg/api"
)
+// Environment-variable prefixes used as a local fallback for object lookups.
+// GetVariable first checks the process environment for VariableEnvPrefix plus
+// the uppercased key (so key "my_var" is read from AIRFLOW_VAR_MY_VAR) before
+// asking the API server, mirroring the Python SDK and making local development
+// and tests easy. ConnectionEnvPrefix is the matching prefix for Connections;
Review Comment:
The doc comment above the exported VariableEnvPrefix/ConnectionEnvPrefix
const block doesn’t follow standard Go doc-comment conventions (comments for
exported identifiers should generally start with the identifier name). Since
this PR is focused on improving the Go SDK’s public docs (pkg.go.dev/pkgsite),
consider moving this description onto per-constant comments so it renders
cleanly and satisfies common linters (e.g. golint/revive).
##########
go-sdk/README.md:
##########
@@ -19,38 +19,214 @@
# Apache Airflow Go Task SDK
-The Go SDK uses the Task Execution Interface (TEI or Task API) introduced in
AIP-72 with Airflow 3.0.0 to give
-Task functions written in Go full access to the Airflow "model", natively in
go.
+The Go SDK is a Go implementation of the Airflow Task SDK. It lets you write
task functions in Go that
+have native access to the Airflow "model" (Variables, Connections, and XCom),
instead of writing them in
+Python.
-The Task API however does not provide a means to get the `ExecuteTaskWorkload`
to the go worker itself. For
-that we use the Edge Executor API.
-Longer term we will likely need to stabilize the Edge Executor API and add
versioning to it.
+It is built on the Task Execution Interface (TEI, a.k.a. the Task API)
introduced by AIP-72 in Airflow
+3.0.0. AIP-72 standardised how a task runtime talks to Airflow over an HTTP
Execution API, which decoupled
+the language a task is written in from the Airflow core. The Go SDK is one
such runtime; the Java SDK is
+another.
-Since Go is a compiled language (putting aside projects such as
[YAEGI](https://github.com/traefik/yaegi) that allow Go to be interpreted), all
tasks must:
+> [!WARNING]
+> This is an **experimental** feature. The SDK is under active development and
its APIs, wire protocols,
+> and tooling may change between releases without notice.
-1. Be compiled into a binary ahead of time, and
-2. Be registered inside the worker process in order to be executed.
+## The compiled-language constraint
+Python tasks are imported and run in-process. Go is compiled, so the model is
different.
-> [!NOTE]
-> This Golang SDK is under active development and is not ready for prime-time
yet.
+A single binary that bundles one or more Dags' task functions is called a
**bundle**. You build one with
+the SDK's packer, `airflow-go-pack`, which compiles your code and appends a
metadata footer (the manifest
+of `dag_id`s and `task_id`s, plus the Dag source) to the executable. The
result is a **self-contained
+executable bundle**: a single runnable file that *is* the bundle, with no
separate manifest or archive to
+ship alongside it.
-## Quickstart
+## You still need a Python stub Dag (for now)
-- See [`example/bundle/main.go`](./example/bundle/main.go) for an example dag
bundle where you can define your task functions
+The Task API does not yet carry Dag *structure* for non-Python languages, so
the Dag's shape and task
+dependencies are still declared in a small Python file using `@task.stub`:
-- Compile this into a binary:
+```python
+from airflow.sdk import dag, task
+
+
[email protected](queue="golang")
+def extract(): ...
+
+
[email protected](queue="golang")
+def transform(): ...
+
+
+@dag()
+def simple_dag():
+ extract() >> transform()
+
+
+simple_dag()
+```
+
+`@task.stub` tells the Dag parser the "shape" of the Go tasks (their names and
dependencies) without any
+Python implementation. The `queue=` value routes the task to the Go runtime.
This Python requirement is a
+known limitation.
+
+
+## Authoring a bundle
+
+Implement `bundlev1.BundleProvider`, register your Dags and tasks, and `main`
is one line. From
+[`example/bundle/main.go`](./example/bundle/main.go):
+
+```go
+type myBundle struct{}
+
+var _ v1.BundleProvider = (*myBundle)(nil)
+
+func (m *myBundle) GetBundleVersion() v1.BundleInfo {
+ return v1.BundleInfo{Name: bundleName, Version: &bundleVersion}
+}
+
+func (m *myBundle) RegisterDags(dagbag v1.Registry) error {
+ simpleDag := dagbag.AddDag("simple_dag")
+ simpleDag.AddTask(extract)
+ simpleDag.AddTask(transform)
+ return nil
+}
+
+func main() {
+ if err := bundlev1server.Serve(&myBundle{}); err != nil {
+ log.Fatal(err)
+ }
+}
+```
+
+A task is an ordinary Go function. The runtime inspects its signature and
injects arguments by type:
+`context.Context`, `*slog.Logger`, and an `sdk.Client` (or a narrower
interface such as
+`sdk.VariableClient`). An optional `(any, error)` return becomes the task's
XCom; an `error` return marks
+the task failed.
+
+```go
+func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any,
error) {
+ conn, err := client.GetConnection(ctx, "test_http")
+ // ... do work, honour ctx cancellation ...
+ return map[string]any{"go_version": runtime.Version()}, nil
+}
+
+func transform(ctx context.Context, client sdk.VariableClient, log
*slog.Logger) error {
+ val, err := client.GetVariable(ctx, "my_variable")
+ if err != nil {
+ return err
+ }
+ log.Info("Obtained variable", "my_variable", val)
+ return nil
+}
+```
+
+Asking for the narrowest interface a task needs (e.g. `sdk.VariableClient`
instead of `sdk.Client`) makes
+unit testing easier and documents which Airflow features the task touches.
`RegisterDags` is the single
+source of truth for which `dag_id`s and `task_id`s a bundle can run.
+
+## Deployment modes
+
+A bundle can run in two ways. The same bundle binary works in both; you pick
one per deployment:
+
+1. **Coordinator** (recommended)
+2. **Edge Worker**
+
+For the protocol details behind each, see [How it works](#how-it-works).
+
+### Coordinator (recommended)
+
+A Python task runner executes the Go task directly, with no separate Go worker
process to run on the host.
+This is the same coordinator mechanism the Java SDK uses.
+
+**Why this is recommended:** the mature Python supervisor handles the
Airflow-facing concerns, so this path
+inherits its capabilities (remote task logs to S3/GCS, the full range of task
states, and alternate XCom
+backends) rather than reimplementing them in Go. These are exactly the
features the Edge Worker path is
+still missing (see [Known limitations](#known-limitations)).
+
+#### Quickstart
+
+- Build and pack your bundle with `airflow-go-pack`. The packer compiles the
bundle and appends an
+ embedded metadata footer so the coordinator can read its `dag_id`s without
executing the binary,
+ producing a single runnable file:
```bash
- go build -o ./bin/sample-dag-bundle ./example/bundle
+ go tool airflow-go-pack ./example/bundle -- -trimpath -tags=prod
```
- (or see the [`Justfile`](./example/bundle/Justfile) for how you can build it
and specify they bundle version number at build time.)
+ Use `--output <path>` to write the packed bundle straight into a directory
the coordinator scans
+ (`executables_root`), and pass extra `go build` flags after `--`.
+
+ For cross-compiling (e.g. deploy to a Linux host from an Apple-silicon
(darwin/arm64) machine), pass `--goos`/`--goarch` and the
+ packer cross-builds for you:
+
+ ```bash
+ go tool airflow-go-pack --goos linux --goarch amd64 \
+ --output ~/airflow/executable-bundles/sample-dag-bundle \
+ ./example/bundle
+ ```
-- Configure the go edge worker, by editing `$AIRFLOW_HOME/go-sdk.yaml`:
+ Alternatively, use `--executable`/`--source`. The packer normally execs the
binary to read
+ its metadata; a cross-compiled binary cannot run on the host, so generate
the metadata on a machine that
+ can run it and pass the file with `--airflow-metadata`:
- These config values need tweaking, especially the ports and secrets. The
ports are the default assuming
- airflow is running locally via `airflow standalone`.
+ ```bash
+ # on linux/amd64 machine:
+ go build -o my-bundle ./example/bundle
+ ./my-bundle --airflow-metadata > airflow-metadata.yaml
+
+ # on darwin/arm64 machine:
+ go tool airflow-go-pack --executable ./my-bundle --source main.go
--airflow-metadata airflow-metadata.yaml
+ ```
+
+ > [!NOTE]
+ > The packer ships via the Go 1.24 `tool` directive, so there is no global
install: add
+ > `tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack` to your bundle
module's `go.mod` and run
+ > it with `go tool airflow-go-pack`. This pins the packer version per
project.
+
+- Register the coordinator and route the queue to it, under `[sdk]` in
`airflow.cfg` (or the equivalent
+ `AIRFLOW__SDK__*` env vars):
+
+ ```ini
+ [sdk]
+ coordinators = {"go": {"classpath":
"airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs":
{"executables_root": ["~/airflow/executable-bundles"]}}}
+ queue_to_coordinator = {"golang": "go"}
+ ```
+
+ `executables_root` is one or more directories the coordinator scans for
bundles; `queue_to_coordinator`
+ routes stub tasks with `queue="golang"` to this Go coordinator.
+
+ > [!IMPORTANT]
+ > The coordinator is part of the Airflow worker, so the `[sdk]` config (and
the bundle files in
+ > `executables_root`) only need to be present wherever tasks actually
execute. With `CeleryExecutor`,
+ > set it on the Celery workers only will be sufficient. With
`LocalExecutor`, tasks run inside the scheduler process, so it
+ > must be set where the scheduler can read it. The API server and Dag
processor do not need it.
Review Comment:
In this IMPORTANT note, the sentence “set it on the Celery workers only will
be sufficient” is ungrammatical, and the line break splits “so it must be set
…” mid-sentence. This is user-facing README content, so it’s worth tightening
the wording for clarity.
##########
go-sdk/Justfile:
##########
@@ -38,6 +38,11 @@ test:
@echo "Running unit tests..."
go run gotest.tools/[email protected] -f dots-v2 ./...
+# Build and serve Go package documentation locally
+docs port="6060":
+ @echo "Serving Go docs at
http://localhost:{{port}}/github.com/apache/airflow/go-sdk"
+ go run golang.org/x/pkgsite/cmd/pkgsite@latest -http=localhost:{{port}} .
Review Comment:
The `docs` recipe runs `pkgsite@latest`. Since `just docs` is part of the
PR’s documented verification path, using `@latest` can make doc previews
non-reproducible (upstream changes can break the command or alter output).
Consider pinning pkgsite to a known-good version (similar to how gotestsum is
pinned) or vendoring it via a Go `tool` directive.
##########
go-sdk/README.md:
##########
@@ -74,63 +250,92 @@ Since Go is a compiled language (putting aside projects
such as [YAEGI](https://
secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
```
- You can also set these options via environment variables of
`AIRFLOW__${section}_${key}`, for example `AIRFLOW__API_AUTH__SECRET_KEY`.
+ You can also set these options via environment variables of
`AIRFLOW__${section}_${key}`, for example
+ `AIRFLOW__API_AUTH__SECRET_KEY`.
-- Install the worker
+- Install the worker:
```bash
go install github.com/apache/airflow/go-sdk/cmd/airflow-go-edge-worker@latest
```
-- Run it!
+- Run it:
```bash
airflow-go-edge-worker run --queues golang
```
-### Example Dag:
+- Deploy the matching Python stub Dag (above) into Airflow.
-You will need to create a python Dag and deploy it in to the Airflow
+## Known limitations
-```python
-from airflow.sdk import dag, task
+A non-exhaustive list of features the **Edge Worker (go-plugin)** path has yet
to implement. These are the
+main reason the coordinator-based path is recommended: in that mode the Python
supervisor handles these
+concerns, so they are not limitations there.
+- Putting tasks into states other than success or failed/up-for-retry
(deferred,
+ failed-without-retries, etc.).
+- Remote task logs (i.e. S3/GCS etc.).
+- XCom reading/writing through non-default XCom backends.
[email protected](queue="golang")
-def extract(): ...
+## How it works
+The same bundle binary speaks two different protocols; which one it uses is
decided at launch by the CLI
+flags it was invoked with. User code (`func main`) is identical either way.
[email protected](queue="golang")
-def transform(): ...
+### Coordinator protocol
+```
+Python supervisor / task runner
+ │ finds + validates the bundle, then forks it:
+ ▼
+ <bundle binary> --comm=127.0.0.1:P1 --logs=127.0.0.1:P2
+ │ binary dials BACK over TCP loopback (msgpack-over-IPC)
+ ▼
+ GetConnection / GetVariable / GetXCom / SetXCom ... → SucceedTask / TaskState
+```
-@dag()
-def simple_dag():
+- The Python `ExecutableCoordinator` forks the bundle binary with
`--comm`/`--logs` addresses it is already
+ listening on. The binary dials back (it never listens) and speaks a
length-prefixed msgpack-over-IPC wire
+ protocol on the comm socket, with structured JSON-line logs on the logs
socket.
+- The Python runtime is the worker. It proxies every `GetConnection` /
`GetVariable` / `GetXCom` /
+ `SetXCom` call through to the Execution API. The Go binary just runs the
task function.
- extract() >> transform()
+The Go side of the protocol is implemented in `pkg/execution/`. On the Python
side it is the
+`ExecutableCoordinator` in
`task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py`.
+### Edge Worker protocol
-simple_dag()
+```
+Airflow scheduler ──Edge Executor API──► airflow-go-edge-worker
──go-plugin/gRPC──► bundle binary
+ (ExecuteTaskWorkload) (long-running Go process)
(child process)
```
-Here we see the `@task.stub` which tells the Dag parser about the "shape" of
the go tasks, and lets us define
-the relationships between them
-
-> [!NOTE]
-> Yes, you still have to have a python Dag file for now. This is a known
limitation at the moment.
-
-## Known missing features
+- `airflow-go-edge-worker` is a long-running Go process. It registers with the
scheduler, polls the Edge
+ Executor API for `ExecuteTaskWorkload`s, and heartbeats.
+- For each workload it execs the bundle binary as a child and connects over
HashiCorp
+ [`go-plugin`](https://github.com/hashicorp/go-plugin) (gRPC over a
handshake-gated socket).
+- The Task API itself has no way to deliver an `ExecuteTaskWorkload` to a Go
worker, so the Edge Executor
+ API fills that gap. Longer term that API will likely need stabilising and
versioning.
-A non-exhaustive list of features we have yet to implement
+## Architectural decisions
-- Support for putting tasks into state other than success or
failed/up-for-retry (deferred, failed-without-retries etc.)
-- Remote task logs (i.e. S3/GCS etc)
-- XCom reading/writing from other XCom backends
+The [`adr/`](./adr) directory records the design decisions behind the SDK:
+- [ADR 0001](./adr/0001-bundle-packing-options.md): bundle-packing options.
+- [ADR 0002](./adr/0002-use-go-tool-directive-for-bundle-packer.md): deliver
the bundle packer via the
+ Go 1.24 `tool` directive.
+- [ADR 0003](./adr/0003-coordinator-protocol-msgpack-ipc.md): dual-mode
coordinator protocol, where one
+ binary speaks both go-plugin gRPC (Edge Worker) and msgpack-over-IPC (Python
coordinator).
+- [ADR 0004](./adr/0004-self-contained-executable-bundle.md): the
self-contained executable bundle, where
+ the executable *is* the bundle.
Review Comment:
The linked issue (#66939) explicitly calls for cross-linking the bundle
specification docs from the Go SDK landing material. This README currently
links ADRs but doesn’t link to `providers/sdk/executable/docs/bundle-spec.rst`,
so users won’t discover the formal bundle format/manifest details from here.
##########
go-sdk/README.md:
##########
@@ -74,63 +250,92 @@ Since Go is a compiled language (putting aside projects
such as [YAEGI](https://
secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
```
- You can also set these options via environment variables of
`AIRFLOW__${section}_${key}`, for example `AIRFLOW__API_AUTH__SECRET_KEY`.
+ You can also set these options via environment variables of
`AIRFLOW__${section}_${key}`, for example
+ `AIRFLOW__API_AUTH__SECRET_KEY`.
Review Comment:
The environment-variable format in this sentence uses a single underscore
between section and key (`AIRFLOW__${section}_${key}`), but Airflow’s config
env vars use double-underscores as separators (as shown by the example
`AIRFLOW__API_AUTH__SECRET_KEY`). The template should match the actual format
to avoid misleading users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]