Copilot commented on code in PR #68221:
URL: https://github.com/apache/airflow/pull/68221#discussion_r3372758169


##########
go-sdk/sdk/sdk.go:
##########
@@ -23,6 +23,12 @@ import (
        "github.com/apache/airflow/go-sdk/pkg/api"
 )
 
+// Environment-variable prefixes used as a local fallback for object lookups.
+// GetVariable first checks the process environment for VariableEnvPrefix plus
+// the uppercased key (so key "my_var" is read from AIRFLOW_VAR_MY_VAR) before
+// asking the API server, mirroring the Python SDK and making local development
+// and tests easy. ConnectionEnvPrefix is the matching prefix for Connections;

Review Comment:
   The doc comment above the exported VariableEnvPrefix/ConnectionEnvPrefix 
const block doesn’t follow standard Go doc-comment conventions (comments for 
exported identifiers should generally start with the identifier name). Since 
this PR is focused on improving the Go SDK’s public docs (pkg.go.dev/pkgsite), 
consider moving this description onto per-constant comments so it renders 
cleanly and satisfies common linters (e.g. golint/revive).



##########
go-sdk/README.md:
##########
@@ -19,38 +19,214 @@
 
 # Apache Airflow Go Task SDK
 
-The Go SDK uses the Task Execution Interface (TEI or Task API) introduced in 
AIP-72 with Airflow 3.0.0 to give
-Task functions written in Go full access to the Airflow "model", natively in 
go.
+The Go SDK is a Go implementation of the Airflow Task SDK. It lets you write 
task functions in Go that
+have native access to the Airflow "model" (Variables, Connections, and XCom), 
instead of writing them in
+Python.
 
-The Task API however does not provide a means to get the `ExecuteTaskWorkload` 
to the go worker itself. For
-that we use the Edge Executor API.
-Longer term we will likely need to stabilize the Edge Executor API and add 
versioning to it.
+It is built on the Task Execution Interface (TEI, a.k.a. the Task API) 
introduced by AIP-72 in Airflow
+3.0.0. AIP-72 standardised how a task runtime talks to Airflow over an HTTP 
Execution API, which decoupled
+the language a task is written in from the Airflow core. The Go SDK is one 
such runtime; the Java SDK is
+another.
 
-Since Go is a compiled language (putting aside projects such as 
[YAEGI](https://github.com/traefik/yaegi) that allow Go to be interpreted), all 
tasks must:
+> [!WARNING]
+> This is an **experimental** feature. The SDK is under active development and 
its APIs, wire protocols,
+> and tooling may change between releases without notice.
 
-1. Be compiled into a binary ahead of time, and
-2. Be registered inside the worker process in order to be executed.
+## The compiled-language constraint
 
+Python tasks are imported and run in-process. Go is compiled, so the model is 
different.
 
-> [!NOTE]
-> This Golang SDK is under active development and is not ready for prime-time 
yet.
+A single binary that bundles one or more Dags' task functions is called a 
**bundle**. You build one with
+the SDK's packer, `airflow-go-pack`, which compiles your code and appends a 
metadata footer (the manifest
+of `dag_id`s and `task_id`s, plus the Dag source) to the executable. The 
result is a **self-contained
+executable bundle**: a single runnable file that *is* the bundle, with no 
separate manifest or archive to
+ship alongside it.
 
-## Quickstart
+## You still need a Python stub Dag (for now)
 
-- See [`example/bundle/main.go`](./example/bundle/main.go) for an example dag 
bundle where you can define your task functions
+The Task API does not yet carry Dag *structure* for non-Python languages, so 
the Dag's shape and task
+dependencies are still declared in a small Python file using `@task.stub`:
 
-- Compile this into a binary:
+```python
+from airflow.sdk import dag, task
+
+
[email protected](queue="golang")
+def extract(): ...
+
+
[email protected](queue="golang")
+def transform(): ...
+
+
+@dag()
+def simple_dag():
+    extract() >> transform()
+
+
+simple_dag()
+```
+
+`@task.stub` tells the Dag parser the "shape" of the Go tasks (their names and 
dependencies) without any
+Python implementation. The `queue=` value routes the task to the Go runtime. 
This Python requirement is a
+known limitation.
+
+
+## Authoring a bundle
+
+Implement `bundlev1.BundleProvider`, register your Dags and tasks, and `main` 
is one line. From
+[`example/bundle/main.go`](./example/bundle/main.go):
+
+```go
+type myBundle struct{}
+
+var _ v1.BundleProvider = (*myBundle)(nil)
+
+func (m *myBundle) GetBundleVersion() v1.BundleInfo {
+    return v1.BundleInfo{Name: bundleName, Version: &bundleVersion}
+}
+
+func (m *myBundle) RegisterDags(dagbag v1.Registry) error {
+    simpleDag := dagbag.AddDag("simple_dag")
+    simpleDag.AddTask(extract)
+    simpleDag.AddTask(transform)
+    return nil
+}
+
+func main() {
+    if err := bundlev1server.Serve(&myBundle{}); err != nil {
+        log.Fatal(err)
+    }
+}
+```
+
+A task is an ordinary Go function. The runtime inspects its signature and 
injects arguments by type:
+`context.Context`, `*slog.Logger`, and an `sdk.Client` (or a narrower 
interface such as
+`sdk.VariableClient`). An optional `(any, error)` return becomes the task's 
XCom; an `error` return marks
+the task failed.
+
+```go
+func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, 
error) {
+    conn, err := client.GetConnection(ctx, "test_http")
+    // ... do work, honour ctx cancellation ...
+    return map[string]any{"go_version": runtime.Version()}, nil
+}
+
+func transform(ctx context.Context, client sdk.VariableClient, log 
*slog.Logger) error {
+    val, err := client.GetVariable(ctx, "my_variable")
+    if err != nil {
+        return err
+    }
+    log.Info("Obtained variable", "my_variable", val)
+    return nil
+}
+```
+
+Asking for the narrowest interface a task needs (e.g. `sdk.VariableClient` 
instead of `sdk.Client`) makes
+unit testing easier and documents which Airflow features the task touches. 
`RegisterDags` is the single
+source of truth for which `dag_id`s and `task_id`s a bundle can run.
+
+## Deployment modes
+
+A bundle can run in two ways. The same bundle binary works in both; you pick 
one per deployment:
+
+1. **Coordinator** (recommended)
+2. **Edge Worker**
+
+For the protocol details behind each, see [How it works](#how-it-works).
+
+### Coordinator (recommended)
+
+A Python task runner executes the Go task directly, with no separate Go worker 
process to run on the host.
+This is the same coordinator mechanism the Java SDK uses.
+
+**Why this is recommended:** the mature Python supervisor handles the 
Airflow-facing concerns, so this path
+inherits its capabilities (remote task logs to S3/GCS, the full range of task 
states, and alternate XCom
+backends) rather than reimplementing them in Go. These are exactly the 
features the Edge Worker path is
+still missing (see [Known limitations](#known-limitations)).
+
+#### Quickstart
+
+- Build and pack your bundle with `airflow-go-pack`. The packer compiles the 
bundle and appends an
+  embedded metadata footer so the coordinator can read its `dag_id`s without 
executing the binary,
+  producing a single runnable file:
 
   ```bash
-  go build -o ./bin/sample-dag-bundle ./example/bundle
+  go tool airflow-go-pack ./example/bundle -- -trimpath -tags=prod
   ```
 
-  (or see the [`Justfile`](./example/bundle/Justfile) for how you can build it 
and specify they bundle version number at build time.)
+  Use `--output <path>` to write the packed bundle straight into a directory 
the coordinator scans
+  (`executables_root`), and pass extra `go build` flags after `--`.
+
+  For cross-compiling (e.g. deploy to a Linux host from an Apple-silicon 
(darwin/arm64) machine), pass `--goos`/`--goarch` and the
+  packer cross-builds for you:
+
+  ```bash
+  go tool airflow-go-pack --goos linux --goarch amd64 \
+    --output ~/airflow/executable-bundles/sample-dag-bundle \
+    ./example/bundle
+  ```
 
-- Configure the go edge worker, by editing `$AIRFLOW_HOME/go-sdk.yaml`:
+  Alternatively, use `--executable`/`--source`. The packer normally execs the 
binary to read
+  its metadata; a cross-compiled binary cannot run on the host, so generate 
the metadata on a machine that
+  can run it and pass the file with `--airflow-metadata`:
 
-  These config values need tweaking, especially the ports and secrets. The 
ports are the default assuming
-  airflow is running locally via `airflow standalone`.
+  ```bash
+  # on linux/amd64 machine:
+  go build -o my-bundle ./example/bundle
+  ./my-bundle --airflow-metadata > airflow-metadata.yaml
+
+  # on darwin/arm64 machine:
+  go tool airflow-go-pack --executable ./my-bundle --source main.go 
--airflow-metadata airflow-metadata.yaml
+  ```
+
+  > [!NOTE]
+  > The packer ships via the Go 1.24 `tool` directive, so there is no global 
install: add
+  > `tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack` to your bundle 
module's `go.mod` and run
+  > it with `go tool airflow-go-pack`. This pins the packer version per 
project.
+
+- Register the coordinator and route the queue to it, under `[sdk]` in 
`airflow.cfg` (or the equivalent
+  `AIRFLOW__SDK__*` env vars):
+
+  ```ini
+  [sdk]
+  coordinators = {"go": {"classpath": 
"airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs": 
{"executables_root": ["~/airflow/executable-bundles"]}}}
+  queue_to_coordinator = {"golang": "go"}
+  ```
+
+  `executables_root` is one or more directories the coordinator scans for 
bundles; `queue_to_coordinator`
+  routes stub tasks with `queue="golang"` to this Go coordinator.
+
+  > [!IMPORTANT]
+  > The coordinator is part of the Airflow worker, so the `[sdk]` config (and 
the bundle files in
+  > `executables_root`) only need to be present wherever tasks actually 
execute. With `CeleryExecutor`,
+  > set it on the Celery workers only will be sufficient. With 
`LocalExecutor`, tasks run inside the scheduler process, so it
+  > must be set where the scheduler can read it. The API server and Dag 
processor do not need it.

Review Comment:
   In this IMPORTANT note, the sentence “set it on the Celery workers only will 
be sufficient” is ungrammatical, and the line break splits “so it must be set 
…” mid-sentence. This is user-facing README content, so it’s worth tightening 
the wording for clarity.



##########
go-sdk/Justfile:
##########
@@ -38,6 +38,11 @@ test:
   @echo "Running unit tests..."
   go run gotest.tools/[email protected] -f dots-v2 ./...
 
+# Build and serve Go package documentation locally
+docs port="6060":
+  @echo "Serving Go docs at 
http://localhost:{{port}}/github.com/apache/airflow/go-sdk";
+  go run golang.org/x/pkgsite/cmd/pkgsite@latest -http=localhost:{{port}} .

Review Comment:
   The `docs` recipe runs `pkgsite@latest`. Since `just docs` is part of the 
PR’s documented verification path, using `@latest` can make doc previews 
non-reproducible (upstream changes can break the command or alter output). 
Consider pinning pkgsite to a known-good version (similar to how gotestsum is 
pinned) or vendoring it via a Go `tool` directive.



##########
go-sdk/README.md:
##########
@@ -74,63 +250,92 @@ Since Go is a compiled language (putting aside projects 
such as [YAEGI](https://
     secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
   ```
 
-  You can also set these options via environment variables of 
`AIRFLOW__${section}_${key}`, for example `AIRFLOW__API_AUTH__SECRET_KEY`.
+  You can also set these options via environment variables of 
`AIRFLOW__${section}_${key}`, for example
+  `AIRFLOW__API_AUTH__SECRET_KEY`.
 
-- Install the worker
+- Install the worker:
 
   ```bash
   go install github.com/apache/airflow/go-sdk/cmd/airflow-go-edge-worker@latest
   ```
 
-- Run it!
+- Run it:
 
   ```bash
   airflow-go-edge-worker run --queues golang
   ```
 
-### Example Dag:
+- Deploy the matching Python stub Dag (above) into Airflow.
 
-You will need to create a python Dag and deploy it in to the Airflow
+## Known limitations
 
-```python
-from airflow.sdk import dag, task
+A non-exhaustive list of features the **Edge Worker (go-plugin)** path has yet 
to implement. These are the
+main reason the coordinator-based path is recommended: in that mode the Python 
supervisor handles these
+concerns, so they are not limitations there.
 
+- Putting tasks into states other than success or failed/up-for-retry 
(deferred,
+  failed-without-retries, etc.).
+- Remote task logs (i.e. S3/GCS etc.).
+- XCom reading/writing through non-default XCom backends.
 
[email protected](queue="golang")
-def extract(): ...
+## How it works
 
+The same bundle binary speaks two different protocols; which one it uses is 
decided at launch by the CLI
+flags it was invoked with. User code (`func main`) is identical either way.
 
[email protected](queue="golang")
-def transform(): ...
+### Coordinator protocol
 
+```
+Python supervisor / task runner
+        │  finds + validates the bundle, then forks it:
+        ▼
+  <bundle binary> --comm=127.0.0.1:P1 --logs=127.0.0.1:P2
+        │  binary dials BACK over TCP loopback (msgpack-over-IPC)
+        ▼
+  GetConnection / GetVariable / GetXCom / SetXCom ... → SucceedTask / TaskState
+```
 
-@dag()
-def simple_dag():
+- The Python `ExecutableCoordinator` forks the bundle binary with 
`--comm`/`--logs` addresses it is already
+  listening on. The binary dials back (it never listens) and speaks a 
length-prefixed msgpack-over-IPC wire
+  protocol on the comm socket, with structured JSON-line logs on the logs 
socket.
+- The Python runtime is the worker. It proxies every `GetConnection` / 
`GetVariable` / `GetXCom` /
+  `SetXCom` call through to the Execution API. The Go binary just runs the 
task function.
 
-    extract() >> transform()
+The Go side of the protocol is implemented in `pkg/execution/`. On the Python 
side it is the
+`ExecutableCoordinator` in 
`task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py`.
 
+### Edge Worker protocol
 
-simple_dag()
+```
+Airflow scheduler ──Edge Executor API──► airflow-go-edge-worker 
──go-plugin/gRPC──► bundle binary
+   (ExecuteTaskWorkload)                  (long-running Go process)            
      (child process)
 ```
 
-Here we see the `@task.stub` which tells the Dag parser about the "shape" of 
the go tasks, and lets us define
-the relationships between them
-
-> [!NOTE]
-> Yes, you still have to have a python Dag file for now. This is a known 
limitation at the moment.
-
-## Known missing features
+- `airflow-go-edge-worker` is a long-running Go process. It registers with the 
scheduler, polls the Edge
+  Executor API for `ExecuteTaskWorkload`s, and heartbeats.
+- For each workload it execs the bundle binary as a child and connects over 
HashiCorp
+  [`go-plugin`](https://github.com/hashicorp/go-plugin) (gRPC over a 
handshake-gated socket).
+- The Task API itself has no way to deliver an `ExecuteTaskWorkload` to a Go 
worker, so the Edge Executor
+  API fills that gap. Longer term that API will likely need stabilising and 
versioning.
 
-A non-exhaustive list of features we have yet to implement
+## Architectural decisions
 
-- Support for putting tasks into state other than success or 
failed/up-for-retry (deferred, failed-without-retries etc.)
-- Remote task logs (i.e. S3/GCS etc)
-- XCom reading/writing from other XCom backends
+The [`adr/`](./adr) directory records the design decisions behind the SDK:
 
+- [ADR 0001](./adr/0001-bundle-packing-options.md): bundle-packing options.
+- [ADR 0002](./adr/0002-use-go-tool-directive-for-bundle-packer.md): deliver 
the bundle packer via the
+  Go 1.24 `tool` directive.
+- [ADR 0003](./adr/0003-coordinator-protocol-msgpack-ipc.md): dual-mode 
coordinator protocol, where one
+  binary speaks both go-plugin gRPC (Edge Worker) and msgpack-over-IPC (Python 
coordinator).
+- [ADR 0004](./adr/0004-self-contained-executable-bundle.md): the 
self-contained executable bundle, where
+  the executable *is* the bundle.

Review Comment:
   The linked issue (#66939) explicitly calls for cross-linking the bundle 
specification docs from the Go SDK landing material. This README currently 
links ADRs but doesn’t link to `providers/sdk/executable/docs/bundle-spec.rst`, 
so users won’t discover the formal bundle format/manifest details from here.



##########
go-sdk/README.md:
##########
@@ -74,63 +250,92 @@ Since Go is a compiled language (putting aside projects 
such as [YAEGI](https://
     secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
   ```
 
-  You can also set these options via environment variables of 
`AIRFLOW__${section}_${key}`, for example `AIRFLOW__API_AUTH__SECRET_KEY`.
+  You can also set these options via environment variables of 
`AIRFLOW__${section}_${key}`, for example
+  `AIRFLOW__API_AUTH__SECRET_KEY`.

Review Comment:
   The environment-variable format in this sentence uses a single underscore 
between section and key (`AIRFLOW__${section}_${key}`), but Airflow’s config 
env vars use double-underscores as separators (as shown by the example 
`AIRFLOW__API_AUTH__SECRET_KEY`). The template should match the actual format 
to avoid misleading users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to