This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 03d26f86c5b Go SDK: airflow-go-pack tool for self-contained bundle
binaries (#67156)
03d26f86c5b is described below
commit 03d26f86c5b1f5a6435c3b43c59168cc5ade4d4e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Jun 8 12:06:55 2026 +0800
Go SDK: airflow-go-pack tool for self-contained bundle binaries (#67156)
* Go-SDK: Add EnumerableBundle/OrderedDags registry enumeration
Record dag and task identity in registration order and expose it via a
new EnumerableBundle interface (OrderedDags). This lets tooling read a
bundle's dag/task ids without executing any task. AddDag/AddTask
signatures are unchanged.
* Add airflow-go-pack for building self-contained Airflow bundles
Introduce the airflow-go-pack tool (delivered via the go.mod tool
directive) that builds a Go bundle binary, queries it for its dag/task
identity, and appends the source plus an airflow-metadata.yaml manifest
plus an AFBNDL01 trailer into a single self-contained executable bundle
(ADR 0002/0004).
The bundle binary exposes a single --airflow-metadata flag that prints
the manifest as JSON conforming to airflow-metadata.schema.json
(airflow_bundle_metadata_version, sdk.{language,version,
supervisor_schema_version}, dags). The manifest wire types live once in
internal/airflowmetadata, shared by the producer (pkg/execution) and the
packer so the two cannot drift. The footer reader/writer lives in
internal/bundlefooter and backs the `inspect` subcommand.
* Go-SDK: align airflow-go-pack bundle footer with spec and support
cross-arch packing
Match the 64-byte AFBNDL01 trailer (with binary_sha256 over the binary
region) that the executable spec and the Python ExecutableCoordinator
already require; the Go packer previously wrote an incompatible 32-byte
trailer that the consumer rejected.
Read the manifest from a host-native build so packing works when the
deployable artefact is cross-compiled, and recover from an unrunnable
foreign-arch --executable by building the --source package on the host
solely to read its metadata. Adds a cross-architecture end-to-end test.
* self-review: Fix nits
* self-review: prevent airflow-go-pack from truncating the input executable
When --output (or the default output derived from the package directory
name) resolved to the same file as the pre-built --executable, the copy
step truncated that file before reading it, destroying the binary and
appending a footer to an empty region.
Resolve the output path before introspection and reject when it aliases
the executable or source (cleaned-abs comparison plus os.SameFile for
inode-level links). Assemble the bundle through a temp file and atomically
rename it into place so a failed pack never leaves a truncated or
half-written artefact at the output path.
* self-review: harden airflow-go-pack flag validation and output checks
- Reject --executable together with --goos/--goarch: --executable packs
the binary as-is and never builds, so it cannot cross-compile. Silently
ignoring the flags contradicted the fail-fast guidance that points
unrunnable-binary users at the cross-build path.
- Resolve the source path and reject a directory/colliding --output before
any go build, so a misconfigured output fails fast instead of after a
(cross) compile.
- Fix stale docs: the manifest is YAML-by-default (not "the JSON shape"),
and the bundle output is a self-contained executable with an appended
AFBNDL01 trailer, not a ZIP.
* Go-SDK: correct airflow-go-pack default-output-name docs
The default bundle output filename is derived from the main package
directory name (matching `go build`), resolved before the build so a bad
`--output` fails fast. It never came from `BundleInfo.Name`, and the
manifest carries no name field. Fix the stale renderManifest comment and
ADR 0002 claim that said otherwise.
* Go-SDK: guard airflow-go-pack output against the --airflow-metadata file
rejectOutputAlias already refused an --output that aliased the executable
or source; extend it to the supplied --airflow-metadata file so
`--output X --airflow-metadata X` fails fast instead of clobbering the
manifest when the bundle is renamed into place.
* Go-SDK: add airflow-go-pack inspect command test
Cover the inspect command's formatting and the bundlefooter.Read happy
path with a table test exercising output with and without --source.
---
...0002-use-go-tool-directive-for-bundle-packer.md | 110 ++--
go-sdk/bundle/bundlev1/bundlev1server/server.go | 63 +-
go-sdk/bundle/bundlev1/registry.go | 51 +-
go-sdk/bundle/bundlev1/registry_test.go | 35 ++
go-sdk/cmd/airflow-go-pack/inspect.go | 57 ++
go-sdk/cmd/airflow-go-pack/inspect_test.go | 74 +++
go-sdk/cmd/airflow-go-pack/main.go | 146 +++++
go-sdk/cmd/airflow-go-pack/pack.go | 672 +++++++++++++++++++++
.../cmd/airflow-go-pack/pack_integration_test.go | 357 +++++++++++
go-sdk/cmd/airflow-go-pack/pack_test.go | 506 ++++++++++++++++
go-sdk/example/bundle/Justfile | 25 +-
go-sdk/go.mod | 4 +-
go-sdk/internal/airflowmetadata/airflowmetadata.go | 51 ++
go-sdk/internal/bundlefooter/footer.go | 280 +++++++++
go-sdk/internal/bundlefooter/footer_test.go | 158 +++++
go-sdk/pkg/execution/messages.go | 7 +
go-sdk/pkg/execution/metadata.go | 157 +++++
go-sdk/pkg/execution/metadata_test.go | 119 ++++
18 files changed, 2803 insertions(+), 69 deletions(-)
diff --git a/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
b/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
index 08bd89a8b5f..3f5efde805c 100644
--- a/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
+++ b/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
@@ -113,35 +113,48 @@ convention so authors can forward arbitrary flags to the
underlying
to produce the *target artifact*.
3. Executes a *host-runnable introspection binary* with
`--airflow-metadata` to obtain
`sdk.{language,version,supervisor_schema_version}`
- and the `dags` mapping. The packer first tries to exec the target
- artifact directly; if that fails with "exec format error" (the
- target is built for a different OS/arch than the host), the
- packer builds a host-arch sidecar from the same package
- (`go build` with `GOOS`/`GOARCH` unset, written to
- `<tmpdir>/<binname>.introspect`) and execs that instead. Both
- binaries are produced from the same source package against the
- same module graph, so `RegisterDags` records identical dag/task
- identity regardless of which one is execed.
- 4. Writes a deterministic ZIP next to the working directory at
- `<bundleName>.zip`, where `<bundleName>` comes from the
- binary's `BundleInfo.Name` (part of the same `--airflow-metadata`
- introspection output).
+ and the `dags` mapping. When cross-compiling, the packer builds a
+ host-arch sidecar from the same package and forwarded build flags
+ and execs that instead; both come from the same sources and flags,
+ so `RegisterDags` records identical identity either way. The sidecar
+ is skipped when `--airflow-metadata` supplies the manifest directly
+ (see the overrides below).
+ 4. Writes a self-contained bundle: the deployable executable with
+ the source bytes, the manifest bytes, and the fixed 64-byte
+ `AFBNDL01` trailer appended (the format from ADR 0004 /
+ `task-sdk/docs/executable-bundle-spec.rst`). By default it is
+ written next to the working directory under the bundle's main
+ package directory name; `--output` overrides the path.
Optional overrides, for advanced or pre-built workflows:
- - `--source <path>`: override the auto-detected source file.
- - `--executable <path>`: skip the internal `go build` and pack a
- pre-built binary. Mutually exclusive with `--` build-flag
- passthrough. If the supplied binary is not host-runnable (e.g.
- the user cross-built a `linux/amd64` binary from a `darwin/arm64`
- host), the packer still needs to introspect it: it builds a
- host-arch sidecar from the positional package and execs that for
- `--airflow-metadata`, then appends the resulting footer to the
- user-supplied binary. If no positional package was passed and
- the supplied binary is not host-runnable, the packer errors with
- a message asking for the source package so the sidecar can be
- built.
- - `--output <path>`: override the default output ZIP path.
+ - `--source <path>`: override the auto-detected source file, and the
+ escape hatch when discovery can't pick it. It skips discovery
+ entirely (a plain `go list` that ignores build tags and
+ `GOOS`/`GOARCH`, so it can fail or pick the wrong file).
+ - `--executable <path>`: skip `go build` and pack a pre-built binary.
+ Mutually exclusive with `--` build flags and with `--goos`/`--goarch`
+ (it never builds). The binary must run on the host so the packer can
+ exec it for `--airflow-metadata`. A non-host-runnable binary is a hard
+ error: the packer won't rebuild a host sidecar, because the original
+ build inputs (tags, `-ldflags`, `GOOS`/`GOARCH` files) are unknown and
+ a rebuild could advertise a different dag/task set than shipped. To
+ pack for another platform, use the build path (`--goos`/`--goarch`,
+ below) or supply `--airflow-metadata`.
+ - `--airflow-metadata <path>`: supply a captured manifest instead of
+ introspecting the binary. Accepts the binary's YAML default, its
+ `--format json` output, or a bundle's embedded `airflow-metadata.yaml`
+ (one YAML decoder reads all three). Short-circuits introspection in
+ every mode — the deterministic way to pack a pre-built cross binary
+ with `--executable`.
+ - `--goos <os>` / `--goarch <arch>`: cross-compile the deployable bundle.
+ Prefer these over the `GOOS`/`GOARCH` env vars: under `go tool` those
+ env vars cross-build the packer itself, which then can't exec on the
+ host. The flags target only the internal `go build`, leaving the
+ packer build host-native; they fall back to the env vars, then the
+ host. Mutually exclusive with `--executable`.
+ - `--output <path>`: override the default output path. Its parent
+ directory is created if missing.
Examples:
@@ -152,14 +165,20 @@ convention so authors can forward arbitrary flags to the
underlying
# Pack a different package, with extra go build flags.
go tool airflow-go-pack ./cmd/my-bundle -- -trimpath -tags=prod
- # Pack an already-built binary (skips go build).
+ # Pack an already-built binary for THIS host (skips go build).
go tool airflow-go-pack --executable ./build/example --source main.go
+
+ # Pack a bundle for a different platform: cross-build via the build
+ # path (no --executable), forwarding go build flags after "--".
+ # Use --goos/--goarch, NOT the GOOS/GOARCH env vars, under `go tool`.
+ go tool airflow-go-pack --goos linux --goarch amd64 ./cmd/my-bundle --
-trimpath
```
2. **Extend the existing `--airflow-metadata` flag in
`bundlev1server.Serve` to print the full spec.** Rather than adding a
second introspection flag, `--airflow-metadata` is the single flag the
- packer relies on; it prints a JSON document of the form:
+ packer relies on; it prints a manifest document (YAML by default, JSON
+ under `--format json`) of the form (shown here as JSON for structure):
```json
{
@@ -183,9 +202,18 @@ convention so authors can forward arbitrary flags to the
underlying
recording registry, then enumerating the recorded dags and their
tasks. `--airflow-metadata` today prints only `BundleInfo`
(`server.go`); it is extended to emit this full document, so the
- shipped `decideMode` switch needs only one metadata mode. The bundle's
- `BundleInfo.Name` (used by the packer for the default output
- filename) is carried in the same output.
+ shipped `decideMode` switch needs only one metadata mode. The packer
+ derives the default output filename from the bundle's main package
+ directory name (what `go build` itself names the binary), resolved
+ before the build so a bad `--output` fails fast; it does not come from
+ `BundleInfo.Name`, and no name field is carried in this output.
+
+ A `--format yaml|json` flag selects the encoding and is only valid with
+ `--airflow-metadata` (misuse is a hard error). The default is YAML,
+ matching a bundle's embedded `airflow-metadata.yaml`, so
+ `mybundle --airflow-metadata > airflow-metadata.yaml` is ready to use.
+ The packer never sets `--format`; its YAML decoder reads both the YAML
+ default and `--format json` output (JSON is a subset of YAML).
3. **Bundle authors register the packer in their own `go.mod`:**
@@ -251,12 +279,14 @@ convention so authors can forward arbitrary flags to the
underlying
- `go build` flag passthrough uses the standard `--` separator
convention so the packer's own flag set stays small and stable.
- Host-runnable detection is by attempted exec, not by parsing the
- binary's exec format. The packer runs the candidate introspection
- binary with `--airflow-metadata` and treats the OS's "exec format
- error" (and the Windows equivalent surfaced by `os/exec`) as the
- signal to fall back to building a host-arch sidecar. Other exec
- failures (non-zero exit, malformed JSON, missing flag) are real
- errors and are surfaced to the user as-is. The Go build cache
- amortises the sidecar to a link step when host arch is already
- involved, so there is no measurable overhead when no cross-compile
- is in play.
+ binary's format: the packer runs the candidate with `--airflow-metadata`
+ and treats an "exec format error" (and its Windows equivalent) as
+ not-startable. Other failures (non-zero exit, malformed output, missing
+ flag) are surfaced as-is.
+- A not-startable binary is handled differently per mode. The **build**
+ path owns the build, so it builds a host-arch sidecar from the same
+ package and `--` flags (the build cache amortises this to a link step,
+ so no measurable overhead without a cross-compile). In **`--executable`**
+ mode the build inputs are unknown, so the packer must not synthesise a
+ sidecar that could diverge from the shipped binary: it fails fast and
+ points the user at the cross-build path or `--airflow-metadata`.
diff --git a/go-sdk/bundle/bundlev1/bundlev1server/server.go
b/go-sdk/bundle/bundlev1/bundlev1server/server.go
index 02036742d09..fde5314d332 100644
--- a/go-sdk/bundle/bundlev1/bundlev1server/server.go
+++ b/go-sdk/bundle/bundlev1/bundlev1server/server.go
@@ -18,9 +18,7 @@
package bundlev1server
import (
- "encoding/json"
"errors"
- "fmt"
"log/slog"
"os"
@@ -44,13 +42,28 @@ var ErrCoordinatorFlagsIncomplete = errors.New(
"--comm and --logs must be supplied together",
)
-// CLI Flags.
-// The --bundle-metadata flag is used for showing the embedded bundle info in
airflow-metadata.yaml spec format.
-// The --comm and --logs select the coordinator-mode protocol
-// All three are read by Serve to choose a server mode below.
+// ErrFormatRequiresMetadata is returned by [Serve] when --format is supplied
+// without --airflow-metadata, the only mode whose encoding it selects.
+var ErrFormatRequiresMetadata = errors.New(
+ "--format is only valid together with --airflow-metadata",
+)
+
+// CLI Flags, all read by Serve to choose a server mode below.
+// --airflow-metadata prints the bundle's manifest and exits (airflow-go-pack
+// consumes it to build the embedded airflow-metadata.yaml); --format selects
+// its encoding. --comm and --logs select coordinator mode.
var (
- versionInfo = flag.Bool("bundle-metadata", false, "show the embedded
bundle info")
- commAddr = flag.String(
+ printMetadata = flag.Bool(
+ "airflow-metadata",
+ false,
+ "print the bundle's airflow-metadata manifest and exit",
+ )
+ metadataFormat = flag.String(
+ "format",
+ string(execution.MetadataFormatYAML),
+ "encoding for --airflow-metadata: yaml (default) or json; only
valid with --airflow-metadata",
+ )
+ commAddr = flag.String(
"comm",
"",
"host:port of the supervisor's coordinator comm channel
(selects coordinator mode)",
@@ -83,7 +96,7 @@ type serveMode int
const (
modePlugin serveMode = iota // go-plugin gRPC (existing
Edge Worker path)
- modeMetadataDump // --bundle-metadata: print
BundleInfo JSON
+ modeAirflowMetadata // --airflow-metadata: print
the manifest JSON (ADR 0002/0004)
modeCoordinator // --comm/--logs:
msgpack-over-IPC (ADR 0003)
modeCoordinatorUsageError // misuse: print usage and
exit non-zero
)
@@ -114,9 +127,21 @@ func Serve(bundle bundlev1.BundleProvider, opts
...ServeOpt) error {
c.ApplyServeOpt(serveConfig)
}
- switch decideMode() {
- case modeMetadataDump:
- return dumpBundleMetadata(bundle)
+ mode := decideMode()
+
+ // --format applies only to --airflow-metadata; reject it elsewhere
instead
+ // of silently ignoring it.
+ if mode != modeAirflowMetadata && flag.CommandLine.Changed("format") {
+ return ErrFormatRequiresMetadata
+ }
+
+ switch mode {
+ case modeAirflowMetadata:
+ format, err := execution.ParseMetadataFormat(*metadataFormat)
+ if err != nil {
+ return err
+ }
+ return execution.DumpAirflowMetadata(bundle, format)
case modeCoordinator:
// In coordinator mode the supervisor reads the logs channel for
// structured records, so configuring the hclog/stderr default
@@ -133,8 +158,8 @@ func Serve(bundle bundlev1.BundleProvider, opts
...ServeOpt) error {
}
func decideMode() serveMode {
- if *versionInfo {
- return modeMetadataDump
+ if *printMetadata {
+ return modeAirflowMetadata
}
commSet := *commAddr != ""
logsSet := *logsAddr != ""
@@ -148,16 +173,6 @@ func decideMode() serveMode {
return modePlugin
}
-func dumpBundleMetadata(bundle bundlev1.BundleProvider) error {
- meta := bundle.GetBundleVersion()
- data, err := json.MarshalIndent(meta, "", " ")
- if err != nil {
- return err
- }
- fmt.Println(string(data))
- return nil
-}
-
func installPluginLogger() {
hcLogger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
diff --git a/go-sdk/bundle/bundlev1/registry.go
b/go-sdk/bundle/bundlev1/registry.go
index 8d902efa081..59cad631125 100644
--- a/go-sdk/bundle/bundlev1/registry.go
+++ b/go-sdk/bundle/bundlev1/registry.go
@@ -43,9 +43,30 @@ type (
AddDag(dagId string) Dag
}
+ // TaskInfo describes a registered task by its user-visible id.
+ TaskInfo struct {
+ ID string
+ }
+
+ // DagInfo describes a registered dag together with its tasks in
+ // registration order.
+ DagInfo struct {
+ DagID string
+ Tasks []TaskInfo
+ }
+
+ // EnumerableBundle exposes the dag/task identity recorded by
RegisterDags.
+ // The default registry implements it; airflow-go-pack relies on it to
read
+ // a bundle's dag/task ids without executing any task.
+ EnumerableBundle interface {
+ OrderedDags() []DagInfo
+ }
+
registry struct {
sync.RWMutex
taskFuncMap map[string]map[string]Task
+ dagOrder []string
+ taskOrder map[string][]string
}
)
@@ -64,7 +85,10 @@ func (d dagShim) AddTaskWithName(taskId string, fn any) {
// Function New creates a new bundle on which Dag and Tasks can be registered
func New() Registry {
- return ®istry{taskFuncMap: make(map[string]map[string]Task)}
+ return ®istry{
+ taskFuncMap: make(map[string]map[string]Task),
+ taskOrder: make(map[string][]string),
+ }
}
func getFnName(fn reflect.Value) string {
@@ -76,10 +100,14 @@ func getFnName(fn reflect.Value) string {
}
func (r *registry) AddDag(dagId string) Dag {
+ r.Lock()
+ defer r.Unlock()
+
if _, exists := r.taskFuncMap[dagId]; exists {
panic(fmt.Errorf("Dag %q already exists in bundle", dagId))
}
r.taskFuncMap[dagId] = make(map[string]Task)
+ r.dagOrder = append(r.dagOrder, dagId)
return dagShim{dagId, r}
}
@@ -109,6 +137,7 @@ func (r *registry) registerTaskWithName(dagId, taskId
string, fn any) {
if !exists {
dagTasks = make(map[string]Task)
r.taskFuncMap[dagId] = dagTasks
+ r.dagOrder = append(r.dagOrder, dagId)
}
_, exists = dagTasks[taskId]
@@ -116,6 +145,7 @@ func (r *registry) registerTaskWithName(dagId, taskId
string, fn any) {
panic(fmt.Errorf("taskId %q is already registered for DAG %q",
taskId, dagId))
}
dagTasks[taskId] = task
+ r.taskOrder[dagId] = append(r.taskOrder[dagId], taskId)
}
func (r *registry) LookupTask(dagId, taskId string) (task Task, exists bool) {
@@ -129,3 +159,22 @@ func (r *registry) LookupTask(dagId, taskId string) (task
Task, exists bool) {
task, exists = dagTasks[taskId]
return task, exists
}
+
+// OrderedDags returns the registered dags in AddDag order, each with its tasks
+// in registration order. The returned slice is freshly allocated; callers may
+// mutate it freely.
+func (r *registry) OrderedDags() []DagInfo {
+ r.RLock()
+ defer r.RUnlock()
+
+ out := make([]DagInfo, 0, len(r.dagOrder))
+ for _, dagID := range r.dagOrder {
+ taskIDs := r.taskOrder[dagID]
+ tasks := make([]TaskInfo, 0, len(taskIDs))
+ for _, tid := range taskIDs {
+ tasks = append(tasks, TaskInfo{ID: tid})
+ }
+ out = append(out, DagInfo{DagID: dagID, Tasks: tasks})
+ }
+ return out
+}
diff --git a/go-sdk/bundle/bundlev1/registry_test.go
b/go-sdk/bundle/bundlev1/registry_test.go
index 25105cd1455..f405c8f5315 100644
--- a/go-sdk/bundle/bundlev1/registry_test.go
+++ b/go-sdk/bundle/bundlev1/registry_test.go
@@ -118,3 +118,38 @@ func (s *RegistrySuite) TestAddTask_ErrorReturnType() {
_, exists := s.reg.LookupTask("dag1", "errorTask")
s.True(exists)
}
+
+func (s *RegistrySuite) TestOrderedDags_Empty() {
+ enum, ok := New().(EnumerableBundle)
+ s.Require().True(ok)
+ s.Empty(enum.OrderedDags())
+}
+
+func (s *RegistrySuite) TestOrderedDags_PreservesRegistrationOrder() {
+ reg := New()
+ // Register dags out of alphabetical order so the test fails if
OrderedDags
+ // ever sorts instead of preserving AddDag order.
+ zeta := reg.AddDag("zeta")
+ zeta.AddTaskWithName("z1", myTask)
+ zeta.AddTaskWithName("z2", myTask)
+
+ alpha := reg.AddDag("alpha")
+ alpha.AddTaskWithName("a1", myTask)
+
+ reg.AddDag("mid") // dag with no tasks
+
+ enum, ok := reg.(EnumerableBundle)
+ s.Require().True(ok)
+
+ got := enum.OrderedDags()
+ s.Require().Len(got, 3)
+
+ s.Equal("zeta", got[0].DagID)
+ s.Equal([]TaskInfo{{ID: "z1"}, {ID: "z2"}}, got[0].Tasks)
+
+ s.Equal("alpha", got[1].DagID)
+ s.Equal([]TaskInfo{{ID: "a1"}}, got[1].Tasks)
+
+ s.Equal("mid", got[2].DagID)
+ s.Empty(got[2].Tasks)
+}
diff --git a/go-sdk/cmd/airflow-go-pack/inspect.go
b/go-sdk/cmd/airflow-go-pack/inspect.go
new file mode 100644
index 00000000000..f9cca6d488b
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/inspect.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "fmt"
+
+ "github.com/spf13/cobra"
+
+ "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+func newInspectCmd() *cobra.Command {
+ var showSource bool
+ cmd := &cobra.Command{
+ Use: "inspect <bundle>",
+ Short: "Print the manifest (and optionally source) embedded in
a bundle",
+ Args: cobra.ExactArgs(1),
+ RunE: func(cmd *cobra.Command, args []string) error {
+ source, manifest, err := bundlefooter.Read(args[0])
+ if err != nil {
+ return err
+ }
+ out := cmd.OutOrStdout()
+ if showSource {
+ fmt.Fprintln(out, "# --- source ---")
+ out.Write(source)
+ if len(source) > 0 && source[len(source)-1] !=
'\n' {
+ fmt.Fprintln(out)
+ }
+ fmt.Fprintln(out, "# --- manifest ---")
+ }
+ out.Write(manifest)
+ if len(manifest) > 0 && manifest[len(manifest)-1] !=
'\n' {
+ fmt.Fprintln(out)
+ }
+ return nil
+ },
+ }
+ cmd.Flags().BoolVar(&showSource, "source", false, "also print the
embedded source file")
+ return cmd
+}
diff --git a/go-sdk/cmd/airflow-go-pack/inspect_test.go
b/go-sdk/cmd/airflow-go-pack/inspect_test.go
new file mode 100644
index 00000000000..8208136e801
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/inspect_test.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// inspect reads a bundle through bundlefooter.Read and prints the embedded
+// manifest, prefixing the source too under --source.
+func TestInspectCmd(t *testing.T) {
+ dir := t.TempDir()
+ exe := filepath.Join(dir, "input-bin")
+ require.NoError(t, os.WriteFile(exe, []byte("binary-bytes"), 0o755))
+ source := []byte("package main\n\nfunc main() {}\n")
+ manifest := []byte(
+ "airflow_bundle_metadata_version: \"1.0\"\n" +
+ "dags:\n" +
+ " my_dag:\n" +
+ " tasks:\n" +
+ " - \"t1\"\n",
+ )
+ bundle := filepath.Join(dir, "bundle")
+ require.NoError(t, writeBundle(exe, bundle, source, manifest))
+
+ for _, tc := range []struct {
+ name string
+ args []string
+ expect string
+ }{
+ {
+ name: "manifest only",
+ args: []string{bundle},
+ expect: string(manifest),
+ },
+ {
+ name: "with source",
+ args: []string{"--source", bundle},
+ expect: "# --- source ---\n" + string(source) +
+ "# --- manifest ---\n" + string(manifest),
+ },
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ cmd := newInspectCmd()
+ var out bytes.Buffer
+ cmd.SetOut(&out)
+ cmd.SetErr(&out)
+ cmd.SetArgs(tc.args)
+ require.NoError(t, cmd.Execute())
+ assert.Equal(t, tc.expect, out.String())
+ })
+ }
+}
diff --git a/go-sdk/cmd/airflow-go-pack/main.go
b/go-sdk/cmd/airflow-go-pack/main.go
new file mode 100644
index 00000000000..cc04b315ac4
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/main.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Command airflow-go-pack builds a self-contained Airflow bundle from a Go
+// package. It runs `go build`, exec's the freshly built binary with
+// `--airflow-metadata` to obtain the manifest, and appends the source plus
+// manifest plus AFBNDL01 trailer to the executable as specified by ADR 0004.
+//
+// Usage:
+//
+// go tool airflow-go-pack [./path/to/pkg] [-- <go build flags>...]
+// go tool airflow-go-pack --executable ./build/example --source main.go
+// go tool airflow-go-pack inspect ./mybundle
+//
+// See go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md and
+// go-sdk/adr/0004-self-contained-executable-bundle.md.
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/spf13/cobra"
+)
+
+func main() {
+ if err := newRootCmd().Execute(); err != nil {
+ fmt.Fprintln(os.Stderr, "error:", err)
+ os.Exit(1)
+ }
+}
+
+func newRootCmd() *cobra.Command {
+ opts := &packOptions{}
+
+ root := &cobra.Command{
+ Use: "airflow-go-pack [package]",
+ Short: "Build a self-contained Airflow bundle from a Go
package",
+ Long: `airflow-go-pack builds a Go bundle binary, queries it
for its DAG/task
+identity via --airflow-metadata, and appends the source plus an
+airflow-metadata.yaml manifest plus an AFBNDL01 trailer to the
+executable. The result is a single self-contained file that drops into
+[executable] bundles_folder.
+
+By default the packer builds the package in the current directory. Pass
+a different package as the positional argument; pass extra go build
+flags after a "--" separator.
+
+--executable expects a binary that runs on this host (same OS/arch). To
+build a bundle for a different platform you have two options: run the
+packer with --goos/--goarch so it cross-builds the deployable artefact
+while building a host-arch binary (forwarding your -- build flags) solely
+to read the manifest; or pack a pre-built cross binary with --executable
+and supply its manifest via --airflow-metadata, captured by running the
+binary on its native platform (mybundle --airflow-metadata > meta.yaml).
+
+Use --goos/--goarch rather than the GOOS/GOARCH env vars: under
+"go tool airflow-go-pack" those env vars cross-build the packer itself,
+which then cannot exec on the host.
+
+Examples:
+ go tool airflow-go-pack
+ go tool airflow-go-pack ./cmd/my-bundle -- -trimpath -tags=prod
+ go tool airflow-go-pack --executable ./build/example --source main.go
+
+ # Cross-platform via the build path: cross-build + host introspection.
+ go tool airflow-go-pack --goos linux --goarch amd64 ./cmd/my-bundle --
-trimpath
+
+ # Cross-platform via a pre-built binary: pack it with its captured manifest.
+ GOOS=linux GOARCH=arm64 go build -o ./build/example-arm64 ./cmd/my-bundle
+ go build -o ./build/example-on-native-host ./cmd/my-bundle
+ ./build/example-on-native-host --airflow-metadata > meta.yaml
+ go tool airflow-go-pack --executable ./build/example-arm64 \
+ --source ./cmd/my-bundle/main.go --airflow-metadata meta.yaml
+`,
+ // Only count args BEFORE "--" toward the positional limit; args
+ // after "--" are forwarded verbatim to `go build` and must not
+ // inflate the count (e.g. `-- -ldflags "-X main.foo=bar"`).
+ Args: func(cmd *cobra.Command, args []string) error {
+ dashAt := cmd.ArgsLenAtDash()
+ pkgArgs := args
+ if dashAt >= 0 {
+ pkgArgs = args[:dashAt]
+ }
+ return cobra.MaximumNArgs(1)(cmd, pkgArgs)
+ },
+ RunE: func(cmd *cobra.Command, args []string) error {
+ // Anything after "--" is forwarded to the internal `go
build`
+ // invocation. ArgsLenAtDash() returns the count of
args before
+ // the dash, or -1 if the dash isn't present.
+ dashAt := cmd.ArgsLenAtDash()
+ var pkgArgs, buildArgs []string
+ if dashAt < 0 {
+ pkgArgs = args
+ } else {
+ pkgArgs = args[:dashAt]
+ buildArgs = args[dashAt:]
+ }
+ opts.pkg = "."
+ if len(pkgArgs) == 1 {
+ opts.pkg = pkgArgs[0]
+ }
+ opts.buildArgs = buildArgs
+ return runPack(cmd.OutOrStdout(), cmd.ErrOrStderr(),
opts)
+ },
+ }
+
+ root.Flags().StringVar(&opts.source, "source",
+ "",
+ "path to the DAG source file (defaults to the file in the
target package containing func main)")
+ root.Flags().StringVar(&opts.executable, "executable",
+ "",
+ "pack a pre-built executable instead of running go build")
+ root.Flags().StringVar(&opts.output, "output",
+ "",
+ "output bundle path (defaults to ./<package-dir-name>)")
+ root.Flags().StringVar(&opts.airflowMetadata, "airflow-metadata",
+ "",
+ "path to a pre-captured --airflow-metadata manifest (JSON or
YAML); skips "+
+ "introspecting the binary")
+ root.Flags().StringVar(&opts.goos, "goos",
+ "",
+ "target GOOS for the bundle (cross-compile); prefer this over
the GOOS env "+
+ "var, which `go tool` would use to cross-build the
packer itself")
+ root.Flags().StringVar(&opts.goarch, "goarch",
+ "",
+ "target GOARCH for the bundle (cross-compile); prefer this over
the GOARCH env "+
+ "var, which `go tool` would use to cross-build the
packer itself")
+
+ root.AddCommand(newInspectCmd())
+ return root
+}
diff --git a/go-sdk/cmd/airflow-go-pack/pack.go
b/go-sdk/cmd/airflow-go-pack/pack.go
new file mode 100644
index 00000000000..8441b23569b
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/pack.go
@@ -0,0 +1,672 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "go/ast"
+ "go/parser"
+ "go/token"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "runtime"
+ "sort"
+ "strings"
+
+ "gopkg.in/yaml.v3"
+
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+ "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+// packOptions are the flags accepted by the root pack command.
+type packOptions struct {
+ pkg string // target package (default ".")
+ source string // override the auto-detected DAG source file
+ executable string // pack a pre-built binary instead of building
+ output string // override the default <bundleName> output
path
+ airflowMetadata string // path to a pre-captured --airflow-metadata
manifest (JSON or YAML)
+ goos string // target GOOS for the deployable build (falls
back to env GOOS, then host)
+ goarch string // target GOARCH for the deployable build
(falls back to env GOARCH, then host)
+ buildArgs []string // forwarded verbatim to `go build` (already
includes the leading "--")
+}
+
+func runPack(stdout, stderr io.Writer, opts *packOptions) error {
+ if opts.executable != "" && len(opts.buildArgs) > 0 {
+ return fmt.Errorf("--executable is mutually exclusive with go
build flags after \"--\"")
+ }
+ if opts.executable != "" && (opts.goos != "" || opts.goarch != "") {
+ return fmt.Errorf(
+ "--executable is mutually exclusive with
--goos/--goarch: --executable packs the " +
+ "binary as-is and never builds, so it cannot
cross-compile. To cross-build a " +
+ "bundle, drop --executable and pass
--goos/--goarch with the package path",
+ )
+ }
+
+ // Resolve the DAG source file for both modes up front. --executable
requires
+ // it explicitly; the build path falls back to discovery.
+ sourcePath := opts.source
+ if opts.executable != "" {
+ if sourcePath == "" {
+ return fmt.Errorf(
+ "--executable requires --source: cannot infer
the DAG source for a pre-built binary",
+ )
+ }
+ } else if sourcePath == "" {
+ // --source is the documented escape hatch for packages whose
main file
+ // cannot be auto-detected: it may be selected by build tags or
GOOS, or
+ // the package may have several files with func main().
Discovery runs a
+ // plain `go list` without the forwarded build flags, so it can
fail or
+ // pick the wrong file for such packages. Only fall back to
discovery
+ // when --source was not supplied, so an explicit --source
always wins.
+ discovered, err := discoverMainSource(opts.pkg)
+ if err != nil {
+ return fmt.Errorf("locating DAG source file: %w", err)
+ }
+ sourcePath = discovered
+ }
+ if _, err := os.Stat(sourcePath); err != nil {
+ return fmt.Errorf("source file %s: %w", sourcePath, err)
+ }
+
+ output := opts.output
+ if output == "" {
+ defaultPath, err := defaultOutputPath(sourcePath)
+ if err != nil {
+ return fmt.Errorf("determining default output path:
%w", err)
+ }
+ output = defaultPath
+ }
+
+ // The bundle is finalised by renaming a temp file onto output, which
fails
+ // if output is an existing directory. Catch that here: the default
output is
+ // the package directory's name, so packing ./foo from a dir that
already has
+ // a ./foo directory collides. Report it with the fix instead of a bare
+ // "rename ...: file exists" from os.Rename. Done before any build so a
+ // misconfigured output fails fast rather than after a (cross) go build.
+ if info, err := os.Stat(output); err == nil && info.IsDir() {
+ return fmt.Errorf(
+ "output path %q is an existing directory (the bundle
output defaults to the "+
+ "package directory's name); pass --output to
write the bundle to a file path",
+ output,
+ )
+ }
+
+ // execPath is the binary that receives the footer (the deployable
artefact,
+ // which MAY be cross-compiled). introspectPath is the binary
obtainMetadata
+ // reads --airflow-metadata from. By default that means exec'ing it on
the
+ // host (so it must be host-runnable, hence the cross-compile sidecar
below),
+ // but --airflow-metadata bypasses it entirely.
+ var execPath, introspectPath string
+ cleanupExec := func() {}
+ defer func() { cleanupExec() }()
+
+ if opts.executable != "" {
+ execPath = opts.executable
+ introspectPath = opts.executable
+ } else {
+ targetGOOS, targetGOARCH := targetPlatform(opts)
+ artifact, cleanup, err := buildPackage(stderr, opts.pkg,
opts.buildArgs, targetGOOS, targetGOARCH)
+ if err != nil {
+ return err
+ }
+ execPath = artifact
+ cleanupExec = cleanup
+ introspectPath = artifact
+
+ // Reading the manifest means exec'ing the binary, so it must
be a
+ // host-native build. When cross-compiling, the artefact cannot
run
+ // here; build a throwaway host binary from the same sources
and the
+ // same forwarded `--` build flags (DAG/task identity is
arch-independent)
+ // solely to introspect. This sidecar is unnecessary when
+ // --airflow-metadata supplies the manifest directly.
+ crossCompiling := targetGOOS != runtime.GOOS || targetGOARCH !=
runtime.GOARCH
+ if crossCompiling && opts.airflowMetadata == "" {
+ hostBin, cleanupHost, err := buildPackage(stderr,
opts.pkg, opts.buildArgs, runtime.GOOS, runtime.GOARCH)
+ if err != nil {
+ return fmt.Errorf("building host binary for
metadata introspection: %w", err)
+ }
+ prevCleanup := cleanupExec
+ cleanupExec = func() { cleanupHost(); prevCleanup() }
+ introspectPath = hostBin
+ }
+ }
+
+ if _, err := os.Stat(execPath); err != nil {
+ return fmt.Errorf("executable %s: %w", execPath, err)
+ }
+
+ if err := rejectOutputAlias(output, execPath, sourcePath,
opts.airflowMetadata); err != nil {
+ return err
+ }
+
+ meta, err := obtainMetadata(opts, introspectPath)
+ if err != nil {
+ return err
+ }
+ if len(meta.Dags) == 0 {
+ return fmt.Errorf("bundle exposes no dags: nothing to pack")
+ }
+ for dagID, dag := range meta.Dags {
+ if len(dag.Tasks) == 0 {
+ fmt.Fprintf(stderr, "warning: dag %q has no tasks\n",
dagID)
+ }
+ }
+
+ manifest, err := renderManifest(meta, filepath.Base(sourcePath))
+ if err != nil {
+ return fmt.Errorf("rendering manifest: %w", err)
+ }
+ sourceBytes, err := os.ReadFile(sourcePath)
+ if err != nil {
+ return fmt.Errorf("reading source file: %w", err)
+ }
+
+ // Assemble the bundle through a temp file and atomically move it into
+ // place: we never mutate the build artefact or the user-supplied
+ // --executable, and a failed pack never leaves a truncated or
half-written
+ // file at output.
+ if err := writeBundle(execPath, output, sourceBytes, manifest); err !=
nil {
+ return err
+ }
+
+ fmt.Fprintf(stdout, "Wrote bundle %s (sdk=%s/%s, dags=%d)\n",
+ output, meta.SDK.Language, meta.SDK.Version, len(meta.Dags))
+ return nil
+}
+
+// defaultOutputPath derives the default bundle output path from the directory
+// that owns the DAG source file. That directory is the bundle's main package,
+// so its base name is what `go build` itself would name the binary. On Windows
+// the .exe suffix is appended.
+func defaultOutputPath(sourcePath string) (string, error) {
+ abs, err := filepath.Abs(sourcePath)
+ if err != nil {
+ return "", err
+ }
+ name := filepath.Base(filepath.Dir(abs))
+ if runtime.GOOS == "windows" {
+ name += ".exe"
+ }
+ return name, nil
+}
+
+// discoverMainSource locates the file in the given package whose AST contains
+// a top-level `func main()`. Returns an error if the package has zero or
+// more than one such file, mirroring ADR 0002's discovery contract.
+func discoverMainSource(pkg string) (string, error) {
+ cmd := exec.Command("go", "list", "-f", "{{.Dir}}\n{{range
.GoFiles}}{{.}}\n{{end}}", pkg)
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ if err := cmd.Run(); err != nil {
+ return "", fmt.Errorf(
+ "go list %s: %w: %s\n"+
+ "airflow-go-pack packs the Go package in the
current directory by default; "+
+ "pass your bundle's package path (e.g.
`airflow-go-pack ./path/to/bundle`) "+
+ "or --source to point at the DAG source file
directly",
+ pkg, err, strings.TrimSpace(stderr.String()),
+ )
+ }
+
+ lines := splitNonEmpty(stdout.String())
+ if len(lines) < 2 {
+ return "", fmt.Errorf("package %s has no Go source files", pkg)
+ }
+ dir := lines[0]
+ files := lines[1:]
+
+ fset := token.NewFileSet()
+ var matches []string
+ for _, name := range files {
+ full := filepath.Join(dir, name)
+ f, err := parser.ParseFile(fset, full, nil,
parser.SkipObjectResolution)
+ if err != nil {
+ return "", fmt.Errorf("parsing %s: %w", full, err)
+ }
+ if hasMainFunc(f) {
+ matches = append(matches, full)
+ }
+ }
+ switch len(matches) {
+ case 0:
+ return "", fmt.Errorf("no file in package %s defines func
main()", pkg)
+ case 1:
+ return matches[0], nil
+ default:
+ return "", fmt.Errorf(
+ "multiple files in package %s define func main(): %v;
use --source to disambiguate",
+ pkg,
+ matches,
+ )
+ }
+}
+
+func hasMainFunc(f *ast.File) bool {
+ for _, decl := range f.Decls {
+ fn, ok := decl.(*ast.FuncDecl)
+ if !ok {
+ continue
+ }
+ if fn.Recv != nil {
+ continue
+ }
+ if fn.Name.Name != "main" {
+ continue
+ }
+ if fn.Type.Params != nil && len(fn.Type.Params.List) != 0 {
+ continue
+ }
+ return true
+ }
+ return false
+}
+
+func splitNonEmpty(s string) []string {
+ var out []string
+ for line := range strings.SplitSeq(s, "\n") {
+ if t := strings.TrimSpace(line); t != "" {
+ out = append(out, t)
+ }
+ }
+ return out
+}
+
+// targetPlatform resolves the GOOS/GOARCH the deployable bundle is built for:
+// the --goos/--goarch flags win, then the ambient GOOS/GOARCH env, then the
+// host. The flags exist because `go tool airflow-go-pack` builds the packer
+// using the ambient GOOS/GOARCH — setting those in the env to cross-compile a
+// bundle would instead cross-build the packer itself and fail to exec it on
the
+// host. Passing the target via flags keeps the env (and the packer build)
+// host-native while still cross-building the bundle.
+func targetPlatform(opts *packOptions) (goos, goarch string) {
+ goos = runtime.GOOS
+ if env := os.Getenv("GOOS"); env != "" {
+ goos = env
+ }
+ if opts.goos != "" {
+ goos = opts.goos
+ }
+ goarch = runtime.GOARCH
+ if env := os.Getenv("GOARCH"); env != "" {
+ goarch = env
+ }
+ if opts.goarch != "" {
+ goarch = opts.goarch
+ }
+ return goos, goarch
+}
+
+// buildPackage runs `go build [extraArgs...] -o <tmp>/bundle <pkg>` for the
+// given GOOS/GOARCH and returns the path to the freshly built executable plus
a
+// cleanup function. extraArgs is the slice that comes after the "--" separator
+// on the airflow-go-pack command line; we drop the leading "--" before
+// forwarding. GOOS/GOARCH are set explicitly (overriding any ambient env) so
+// the caller controls the target: the deployable build uses the resolved
target
+// platform, the introspection sidecar uses the host.
+func buildPackage(
+ stderr io.Writer,
+ pkg string,
+ extraArgs []string,
+ goos, goarch string,
+) (string, func(), error) {
+ tmpDir, err := os.MkdirTemp("", "airflow-go-pack-*")
+ if err != nil {
+ return "", nil, fmt.Errorf("creating temp dir: %w", err)
+ }
+ cleanup := func() { _ = os.RemoveAll(tmpDir) }
+
+ binName := "bundle"
+ if goos == "windows" {
+ binName += ".exe"
+ }
+ outPath := filepath.Join(tmpDir, binName)
+
+ args := []string{"build"}
+ for _, a := range extraArgs {
+ if a == "--" {
+ continue
+ }
+ args = append(args, a)
+ }
+ args = append(args, "-o", outPath, pkg)
+
+ cmd := exec.Command("go", args...)
+ // Later duplicate keys win in os/exec, so these override any ambient
+ // GOOS/GOARCH (which `go tool` already used to build this packer).
+ cmd.Env = append(os.Environ(), "GOOS="+goos, "GOARCH="+goarch)
+ cmd.Stdout = stderr
+ cmd.Stderr = stderr
+ if err := cmd.Run(); err != nil {
+ cleanup()
+ return "", nil, fmt.Errorf("go build failed: %w", err)
+ }
+ return outPath, cleanup, nil
+}
+
+func readAirflowMetadata(execPath string) (airflowmetadata.Manifest, error) {
+ out, err := runIntrospect(execPath, "--airflow-metadata")
+ if err != nil {
+ return airflowmetadata.Manifest{}, err
+ }
+ // Decode with a YAML decoder: it reads the binary's YAML default and
its
+ // --format json output alike (JSON is a subset of YAML).
+ var meta airflowmetadata.Manifest
+ if err := yaml.Unmarshal(out, &meta); err != nil {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
+ "decoding --airflow-metadata output (YAML/JSON): %w",
+ err,
+ )
+ }
+ return meta, nil
+}
+
+// obtainMetadata resolves the bundle manifest either from an explicit
+// --airflow-metadata file or by exec'ing a host-runnable introspection binary.
+// In --executable mode a binary that cannot be exec'd on the host is a hard
+// error with remediation guidance: --executable expects a same-platform
binary,
+// and the packer never silently rebuilds a host binary, because a rebuild from
+// unknown inputs (the original build tags, ldflags, and GOOS/GOARCH-specific
+// files are not known here, and build flags are rejected in --executable mode)
+// can advertise a different DAG/task set than the artefact actually shipped.
+func obtainMetadata(opts *packOptions, introspectPath string)
(airflowmetadata.Manifest, error) {
+ if opts.airflowMetadata != "" {
+ meta, err := readMetadataFile(opts.airflowMetadata)
+ if err != nil {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
+ "--airflow-metadata %s: %w",
+ opts.airflowMetadata,
+ err,
+ )
+ }
+ return meta, nil
+ }
+
+ meta, err := readAirflowMetadata(introspectPath)
+ if err == nil {
+ return meta, nil
+ }
+ if opts.executable != "" && errors.Is(err, errExecNotStartable) {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
+ "cannot exec --executable %q on %s/%s to read its
--airflow-metadata: %w\n"+
+ "--executable expects a binary that runs on
this host. To pack a binary for a\n"+
+ "different platform, drop --executable and let
the packer cross-build instead:\n"+
+ " airflow-go-pack --goos <os> --goarch
<arch> ./path/to/pkg [-- <go build flags>]\n"+
+ "(the packer builds a host-arch binary,
forwarding your -- build flags, solely to\n"+
+ "read the manifest). Alternatively pass
--airflow-metadata with the manifest captured\n"+
+ "from the binary on its native platform: %s
--airflow-metadata > airflow-metadata.yaml",
+ opts.executable, runtime.GOOS, runtime.GOARCH, err,
opts.executable,
+ )
+ }
+ return airflowmetadata.Manifest{}, fmt.Errorf("--airflow-metadata: %w",
err)
+}
+
+// readMetadataFile parses a manifest from a pre-captured --airflow-metadata
+// file. It accepts both the JSON a bundle binary prints (via
+// `mybundle --airflow-metadata`) and the airflow-metadata.yaml embedded in an
+// existing bundle: YAML is a superset of JSON, so a YAML decoder reads either.
+func readMetadataFile(path string) (airflowmetadata.Manifest, error) {
+ data, err := os.ReadFile(path)
+ if err != nil {
+ return airflowmetadata.Manifest{}, err
+ }
+ var meta airflowmetadata.Manifest
+ if err := yaml.Unmarshal(data, &meta); err != nil {
+ return airflowmetadata.Manifest{}, fmt.Errorf("decoding
metadata (YAML/JSON): %w", err)
+ }
+ return meta, nil
+}
+
+// errExecNotStartable marks an introspection failure where the process never
+// ran — typically the binary was built for a different CPU arch / OS, so the
+// OS rejected the exec (e.g. "exec format error", "bad CPU type"). It is
+// distinct from the binary running and exiting non-zero (an *exec.ExitError),
+// which signals a genuine --airflow-metadata failure rather than an
+// unrunnable binary.
+var errExecNotStartable = errors.New("introspection binary could not be
exec'd")
+
+func runIntrospect(execPath string, flag string) ([]byte, error) {
+ cmd := exec.Command(execPath, flag)
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
+ if err := cmd.Run(); err != nil {
+ var exitErr *exec.ExitError
+ if !errors.As(err, &exitErr) {
+ // The process did not start (no exit status). Wrap
with the
+ // sentinel so callers can decide whether to fall back
to a build.
+ return nil, fmt.Errorf("%w: %s %s: %v",
errExecNotStartable, execPath, flag, err)
+ }
+ return nil, fmt.Errorf("%s %s: %w: %s", execPath, flag, err,
stderr.String())
+ }
+ return stdout.Bytes(), nil
+}
+
+// renderManifest serialises the airflow-metadata manifest as deterministic,
+// sorted-key YAML matching airflow-metadata.schema.json. It injects the
schema's
+// source field (the filename the manifest is built from), which the producer's
+// Manifest omits because only the packer knows it; every other field is copied
+// from the introspected manifest verbatim.
+func renderManifest(meta airflowmetadata.Manifest, sourceName string) ([]byte,
error) {
+ version := meta.AirflowBundleMetadataVersion
+ if version == "" {
+ version = airflowmetadata.FormatVersion
+ }
+
+ dagIDs := make([]string, 0, len(meta.Dags))
+ for id := range meta.Dags {
+ dagIDs = append(dagIDs, id)
+ }
+ sort.Strings(dagIDs)
+
+ dagsNode := &yaml.Node{Kind: yaml.MappingNode}
+ for _, id := range dagIDs {
+ tasks := meta.Dags[id].Tasks
+ taskItems := make([]*yaml.Node, 0, len(tasks))
+ for _, t := range tasks {
+ taskItems = append(taskItems, quotedScalar(t))
+ }
+ dagsNode.Content = append(dagsNode.Content,
+ scalar(id),
+ &yaml.Node{
+ Kind: yaml.MappingNode,
+ Content: []*yaml.Node{
+ scalar("tasks"),
+ {Kind: yaml.SequenceNode, Content:
taskItems},
+ },
+ },
+ )
+ }
+
+ root := &yaml.Node{Kind: yaml.DocumentNode}
+ manifest := &yaml.Node{
+ Kind: yaml.MappingNode,
+ Content: []*yaml.Node{
+ scalar("airflow_bundle_metadata_version"),
quotedScalar(version),
+ scalar("sdk"),
+ {
+ Kind: yaml.MappingNode,
+ Content: []*yaml.Node{
+ scalar("language"),
quotedScalar(meta.SDK.Language),
+ scalar("version"),
quotedScalar(meta.SDK.Version),
+ scalar("supervisor_schema_version"),
+
quotedScalar(meta.SDK.SupervisorSchemaVersion),
+ },
+ },
+ scalar("source"), quotedScalar(sourceName),
+ scalar("dags"), dagsNode,
+ },
+ }
+ root.Content = []*yaml.Node{manifest}
+
+ var buf bytes.Buffer
+ enc := yaml.NewEncoder(&buf)
+ enc.SetIndent(2)
+ if err := enc.Encode(root); err != nil {
+ return nil, err
+ }
+ if err := enc.Close(); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+// scalar emits a plain (unquoted) node. It is used for structural keys
+// (e.g. "sdk", "tasks") and for the Dag ID mapping keys.
+func scalar(value string) *yaml.Node {
+ return &yaml.Node{Kind: yaml.ScalarNode, Value: value}
+}
+
+// quotedScalar emits a double-quoted node. Data-bearing string *values* — task
+// IDs, the source filename, and the SDK fields — go through this so a value
+// that looks like a number, bool, or date (e.g. a task named "123" or "true")
+// round-trips as a string rather than being retyped by the YAML parser.
+func quotedScalar(value string) *yaml.Node {
+ return &yaml.Node{Kind: yaml.ScalarNode, Value: value, Style:
yaml.DoubleQuotedStyle}
+}
+
+// rejectOutputAlias fails if output resolves to the same file as any pack
+// input: the executable, the source, or a supplied --airflow-metadata file.
+// Packing copies the executable to output with O_TRUNC and renames it into
+// place, so an aliased output would clobber the input. metadataPath is empty
+// when --airflow-metadata is not used and is skipped in that case.
+func rejectOutputAlias(output, execPath, sourcePath, metadataPath string)
error {
+ for _, in := range []struct {
+ path string
+ kind string
+ }{
+ {execPath, "executable"},
+ {sourcePath, "source"},
+ {metadataPath, "--airflow-metadata file"},
+ } {
+ if in.path == "" {
+ continue
+ }
+ alias, err := sameFile(output, in.path)
+ if err != nil {
+ return fmt.Errorf("resolving output path %s: %w",
output, err)
+ }
+ if alias {
+ return fmt.Errorf(
+ "output path %s is the same file as the %s %s;
pass --output to write the bundle elsewhere",
+ output,
+ in.kind,
+ in.path,
+ )
+ }
+ }
+ return nil
+}
+
+// sameFile reports whether a and b refer to the same file. It first compares
+// cleaned absolute paths (catching e.g. "./bundle" vs "bundle" before either
+// exists) and then, when both paths exist, falls back to os.SameFile to catch
+// links that share an inode.
+func sameFile(a, b string) (bool, error) {
+ absA, err := filepath.Abs(a)
+ if err != nil {
+ return false, err
+ }
+ absB, err := filepath.Abs(b)
+ if err != nil {
+ return false, err
+ }
+ if absA == absB {
+ return true, nil
+ }
+ fiA, err := os.Stat(absA)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return false, nil
+ }
+ return false, err
+ }
+ fiB, err := os.Stat(absB)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return false, nil
+ }
+ return false, err
+ }
+ return os.SameFile(fiA, fiB), nil
+}
+
+// writeBundle assembles the bundle at output by copying the executable to a
+// temporary file in output's directory, appending the source+manifest footer
+// to that copy, then atomically renaming it into place. Writing through a
+// temp file keeps a failed pack from leaving a truncated or half-written
+// artefact at output, and guarantees the file being copied is never the same
+// open file as the destination.
+func writeBundle(execPath, output string, source, metadata []byte) error {
+ outDir := filepath.Dir(output)
+ // The temp file and the atomic rename both live in output's directory,
so it
+ // must exist. Create it for the user (e.g. --output ./bin/bundle with
no
+ // ./bin) instead of failing with an opaque temp-file "no such file"
error.
+ if err := os.MkdirAll(outDir, 0o755); err != nil {
+ return fmt.Errorf("creating output directory %s: %w", outDir,
err)
+ }
+ tmp, err := os.CreateTemp(outDir, ".airflow-go-pack-*")
+ if err != nil {
+ return fmt.Errorf("creating temp file for %s: %w", output, err)
+ }
+ tmpPath := tmp.Name()
+ _ = tmp.Close()
+ committed := false
+ defer func() {
+ if !committed {
+ _ = os.Remove(tmpPath)
+ }
+ }()
+
+ if err := copyFile(execPath, tmpPath, 0o755); err != nil {
+ return fmt.Errorf("writing %s: %w", output, err)
+ }
+ if err := bundlefooter.Append(tmpPath, source, metadata); err != nil {
+ return err
+ }
+ if err := os.Rename(tmpPath, output); err != nil {
+ return fmt.Errorf("finalising %s: %w", output, err)
+ }
+ committed = true
+ return nil
+}
+
+// copyFile copies src to dst, truncating dst if it already exists.
+func copyFile(src, dst string, mode os.FileMode) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer in.Close()
+ out, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode)
+ if err != nil {
+ return err
+ }
+ if _, err := io.Copy(out, in); err != nil {
+ out.Close()
+ return err
+ }
+ if err := out.Close(); err != nil {
+ return err
+ }
+ return os.Chmod(dst, mode)
+}
diff --git a/go-sdk/cmd/airflow-go-pack/pack_integration_test.go
b/go-sdk/cmd/airflow-go-pack/pack_integration_test.go
new file mode 100644
index 00000000000..edb51824c4c
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/pack_integration_test.go
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "regexp"
+ "runtime"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v3"
+
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+ "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+// crossArchFor returns an architecture different from the host that the Go
+// toolchain can target, or "" if we have no safe mapping for this host.
+func crossArchFor(hostArch string) string {
+ switch hostArch {
+ case "amd64":
+ return "arm64"
+ case "arm64":
+ return "amd64"
+ default:
+ return ""
+ }
+}
+
+// End-to-end cross-arch --executable test: a binary built for an arch the host
+// cannot run is packed into a spec-conforming bundle with its binary region
+// preserved byte-for-byte. The caller supplies the artefact's own
+// --airflow-metadata output (captured here from a host build of the same
+// sources) rather than the packer rebuilding to guess the metadata.
+func TestPack_CrossArchExecutableWithMetadataFile(t *testing.T) {
+ if testing.Short() {
+ t.Skip("cross-arch pack test shells out to `go build` twice")
+ }
+ if _, err := exec.LookPath("go"); err != nil {
+ t.Skip("go toolchain not on PATH")
+ }
+ crossArch := crossArchFor(runtime.GOARCH)
+ if crossArch == "" {
+ t.Skipf("no cross-arch mapping for host arch %q",
runtime.GOARCH)
+ }
+
+ // The example bundle is a real BundleProvider that answers
+ // --airflow-metadata, so it exercises the genuine metadata path.
+ exampleDir, err := filepath.Abs(filepath.Join("..", "..", "example",
"bundle"))
+ require.NoError(t, err)
+ sourceFile := filepath.Join(exampleDir, "main.go")
+ if _, err := os.Stat(sourceFile); err != nil {
+ t.Skipf("example bundle source not found: %v", err)
+ }
+
+ tmp := t.TempDir()
+ crossBin := filepath.Join(tmp, "prebuilt_cross")
+ hostBin := filepath.Join(tmp, "prebuilt_host")
+
+ // Build the example for a foreign arch (the --executable input) and for
+ // the host. CGO is disabled so the cross build needs no C toolchain.
+ goBuild(t, exampleDir, crossBin, runtime.GOOS, crossArch)
+ goBuild(t, exampleDir, hostBin, runtime.GOOS, runtime.GOARCH)
+
+ crossBytes, err := os.ReadFile(crossBin)
+ require.NoError(t, err)
+ hostBytes, err := os.ReadFile(hostBin)
+ require.NoError(t, err)
+ require.False(t, bytes.Equal(crossBytes, hostBytes),
+ "cross and host builds should differ; cross-compile may not
have taken effect")
+
+ // Capture the artefact's own --airflow-metadata JSON from the host
build,
+ // standing in for the author running the binary on its native platform.
+ metaJSON := filepath.Join(tmp, "airflow-metadata.json")
+ captureMetadata(t, hostBin, metaJSON)
+
+ // Pack the foreign-arch executable through the real CLI command,
feeding
+ // the captured metadata so no host rebuild is needed.
+ outPath := filepath.Join(tmp, "bundle")
+ cmd := newRootCmd()
+ cmd.SetArgs([]string{
+ "--executable", crossBin,
+ "--source", sourceFile,
+ "--airflow-metadata", metaJSON,
+ "--output", outPath,
+ })
+ cmd.SetOut(&bytes.Buffer{})
+ cmd.SetErr(&bytes.Buffer{})
+ require.NoError(t, cmd.Execute())
+
+ bundleBytes, err := os.ReadFile(outPath)
+ require.NoError(t, err)
+
+ // Read parses the trailer and verifies binary_sha256 over the binary
+ // region; success means the bundle is spec-valid.
+ source, metadata, err := bundlefooter.Read(outPath)
+ require.NoError(t, err)
+
+ srcBytes, err := os.ReadFile(sourceFile)
+ require.NoError(t, err)
+ assert.Equal(t, srcBytes, source, "embedded source must match --source
bytes")
+
+ // sdk.version is environment-dependent and not asserted verbatim: a
plain
+ // `go build` from a local module tree leaves Main.Version unset and
yields
+ // "(devel)", while Go 1.24's VCS stamping (e.g. in CI, building from
the
+ // git checkout) yields a pseudo-version like
v0.0.0-<timestamp>-<commit>,
+ // and a tagged-release build yields a semver tag. Assert the version
line
+ // matches an accepted form, then fold the observed value into the
expected
+ // manifest so the remaining fields and ordering are checked exactly.
+ // supervisor_schema_version and the format version come from SDK
constants
+ // and change only when those constants do.
+ versionLine := regexp.MustCompile(`(?m)^ version: "([^"]*)"$`)
+ m := versionLine.FindStringSubmatch(string(metadata))
+ require.NotNil(t, m, "manifest must contain an sdk.version line:\n%s",
metadata)
+ sdkVersion := m[1]
+ assert.Regexp(t, `^(\(devel\)|v[0-9].*)$`, sdkVersion,
+ `sdk.version must be "(devel)" or a v-prefixed module version`)
+
+ expectedManifest := `airflow_bundle_metadata_version: "1.0"
+sdk:
+ language: "go"
+ version: "` + sdkVersion + `"
+ supervisor_schema_version: "2026-06-16"
+source: "main.go"
+dags:
+ simple_dag:
+ tasks:
+ - "extract"
+ - "transform"
+ - "load"
+`
+ assert.Equal(t, expectedManifest, string(metadata))
+
+ // The packed binary region must be exactly the foreign-arch executable:
+ // the captured metadata describes it, and the binary is never rebuilt.
+ binaryRegion :=
bundleBytes[:len(bundleBytes)-len(source)-len(metadata)-bundlefooter.TrailerSize]
+ assert.Equal(t, crossBytes, binaryRegion,
+ "packed binary region must be the foreign-arch --executable,
not a host rebuild")
+}
+
+// End-to-end build-mode cross-compile (no --executable): with GOOS/GOARCH set,
+// the packer builds the target-arch artefact and a host-arch binary (solely to
+// read the manifest), forwarding the `--` go build flags. The packed binary
+// must be the target-arch artefact built with the forwarded flags.
+func TestPack_CrossCompileBuildModeForwardsFlags(t *testing.T) {
+ if testing.Short() {
+ t.Skip("cross-arch pack test shells out to `go build` twice")
+ }
+ if _, err := exec.LookPath("go"); err != nil {
+ t.Skip("go toolchain not on PATH")
+ }
+ crossArch := crossArchFor(runtime.GOARCH)
+ if crossArch == "" {
+ t.Skipf("no cross-arch mapping for host arch %q",
runtime.GOARCH)
+ }
+
+ exampleDir, err := filepath.Abs(filepath.Join("..", "..", "example",
"bundle"))
+ require.NoError(t, err)
+ if _, err := os.Stat(filepath.Join(exampleDir, "main.go")); err != nil {
+ t.Skipf("example bundle source not found: %v", err)
+ }
+
+ // Cross-compile via the environment, exactly as a user would. CGO is
+ // disabled so the cross build needs no C toolchain.
+ t.Setenv("GOOS", runtime.GOOS)
+ t.Setenv("GOARCH", crossArch)
+ t.Setenv("CGO_ENABLED", "0")
+
+ tmp := t.TempDir()
+ outPath := filepath.Join(tmp, "bundle")
+
+ // Pack the example package (no --executable, no --source), forwarding
+ // -trimpath after the "--" separator to the internal go build.
+ cmd := newRootCmd()
+ cmd.SetArgs([]string{exampleDir, "--output", outPath, "--",
"-trimpath"})
+ cmd.SetOut(&bytes.Buffer{})
+ cmd.SetErr(&bytes.Buffer{})
+ require.NoError(t, cmd.Execute())
+
+ source, metadata, err := bundlefooter.Read(outPath)
+ require.NoError(t, err)
+ assert.Contains(t, string(metadata), "simple_dag:",
+ "manifest must be read from the host introspection build")
+
+ // Independently build the target-arch artefact with the same forwarded
+ // flag; the packed binary region must match it byte-for-byte, proving
the
+ // deployable artefact is the cross build (not the host introspection
one)
+ // and that -trimpath was forwarded.
+ wantBin := filepath.Join(tmp, "want_cross")
+ build := exec.Command("go", "build", "-trimpath", "-o", wantBin,
exampleDir)
+ build.Env = append(os.Environ(), "GOOS="+runtime.GOOS,
"GOARCH="+crossArch, "CGO_ENABLED=0")
+ if combined, berr := build.CombinedOutput(); berr != nil {
+ t.Fatalf("reference cross build failed: %v\n%s", berr, combined)
+ }
+ wantBytes, err := os.ReadFile(wantBin)
+ require.NoError(t, err)
+
+ bundleBytes, err := os.ReadFile(outPath)
+ require.NoError(t, err)
+ binaryRegion :=
bundleBytes[:len(bundleBytes)-len(source)-len(metadata)-bundlefooter.TrailerSize]
+ assert.Equal(t, wantBytes, binaryRegion,
+ "packed binary must be the cross-built artefact with -trimpath
forwarded")
+}
+
+// When --source is supplied, the packer skips source discovery, so a package
+// whose main file cannot be auto-detected (two files with func main, which
+// discovery rejects as ambiguous) is still packable. The failure is the
+// downstream `go build` error, not the discovery error — proving discovery was
+// skipped.
+func TestRunPack_SourceBypassesDiscovery(t *testing.T) {
+ if testing.Short() {
+ t.Skip("shells out to the go toolchain")
+ }
+ if _, err := exec.LookPath("go"); err != nil {
+ t.Skip("go toolchain not on PATH")
+ }
+
+ // Two files both define func main(), which discoverMainSource rejects
as
+ // ambiguous. Building them together is also a compile error, so
reaching
+ // `go build` (rather than failing in discovery) is the observable
signal
+ // that --source bypassed discovery.
+ pkgDir := t.TempDir()
+ require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "go.mod"),
+ []byte("module ambiguousmain\n\ngo 1.24\n"), 0o644))
+ require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "a.go"),
+ []byte("package main\n\nfunc main() {}\n"), 0o644))
+ require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "b.go"),
+ []byte("package main\n\nfunc main() {}\n"), 0o644))
+
+ source := filepath.Join(pkgDir, "a.go")
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ pkg: pkgDir,
+ source: source,
+ output: filepath.Join(t.TempDir(), "bundle"),
+ })
+ require.Error(t, err)
+ // With --source honoured, discovery is skipped; the build still runs
and
+ // fails on the duplicate main.
+ assert.NotContains(t, err.Error(), "locating DAG source file",
+ "--source must bypass source discovery")
+ assert.Contains(t, err.Error(), "go build failed",
+ "packing should proceed to the build step when --source is
given")
+}
+
+// Checks the bundle binary's --airflow-metadata encodings: default is YAML,
+// --format json emits JSON, both decode to the same manifest, and --format
+// without --airflow-metadata is a hard error.
+func TestBundleBinary_AirflowMetadataFormats(t *testing.T) {
+ if testing.Short() {
+ t.Skip("shells out to `go build`")
+ }
+ if _, err := exec.LookPath("go"); err != nil {
+ t.Skip("go toolchain not on PATH")
+ }
+
+ exampleDir, err := filepath.Abs(filepath.Join("..", "..", "example",
"bundle"))
+ require.NoError(t, err)
+ if _, err := os.Stat(filepath.Join(exampleDir, "main.go")); err != nil {
+ t.Skipf("example bundle source not found: %v", err)
+ }
+
+ hostBin := filepath.Join(t.TempDir(), "bundle")
+ goBuild(t, exampleDir, hostBin, runtime.GOOS, runtime.GOARCH)
+
+ assertManifest := func(t *testing.T, meta airflowmetadata.Manifest) {
+ t.Helper()
+ assert.Equal(t, "go", meta.SDK.Language)
+ require.Contains(t, meta.Dags, "simple_dag")
+ assert.Equal(t, []string{"extract", "transform", "load"},
meta.Dags["simple_dag"].Tasks)
+ }
+
+ // Default: YAML. A JSON document would start with '{'.
+ yamlOut, err := exec.Command(hostBin, "--airflow-metadata").Output()
+ require.NoError(t, err, "running %s --airflow-metadata", hostBin)
+ assert.False(t, bytes.HasPrefix(bytes.TrimSpace(yamlOut), []byte("{")),
+ "default --airflow-metadata must emit YAML, not JSON")
+ assert.Contains(t, string(yamlOut), "airflow_bundle_metadata_version:")
+ var fromYAML airflowmetadata.Manifest
+ require.NoError(t, yaml.Unmarshal(yamlOut, &fromYAML))
+ assertManifest(t, fromYAML)
+
+ // --format json: JSON output.
+ jsonOut, err := exec.Command(hostBin, "--airflow-metadata", "--format",
"json").Output()
+ require.NoError(t, err, "running %s --airflow-metadata --format json",
hostBin)
+ assert.True(t, bytes.HasPrefix(bytes.TrimSpace(jsonOut), []byte("{")),
+ "--format json must emit JSON")
+ var fromJSON airflowmetadata.Manifest
+ require.NoError(t, json.Unmarshal(jsonOut, &fromJSON))
+ assertManifest(t, fromJSON)
+
+ assert.Equal(t, fromYAML, fromJSON, "both encodings must decode to the
same manifest")
+
+ // --format without --airflow-metadata is a usage error.
+ bad, err := exec.Command(hostBin, "--format", "json").CombinedOutput()
+ require.Error(t, err, "--format without --airflow-metadata must exit
non-zero")
+ assert.Contains(t, string(bad), "--format is only valid together with
--airflow-metadata")
+}
+
+// Running the packer from a directory that is not a bundle main package must
+// turn the bare `go list` failure into an actionable error pointing at a
+// package path or --source.
+func TestDiscoverMainSource_NoGoFilesGivesGuidance(t *testing.T) {
+ if _, err := exec.LookPath("go"); err != nil {
+ t.Skip("go toolchain not on PATH")
+ }
+ dir := t.TempDir()
+ require.NoError(t, os.WriteFile(filepath.Join(dir, "go.mod"),
+ []byte("module testempty\n\ngo 1.24\n"), 0o644))
+ t.Chdir(dir)
+
+ _, err := discoverMainSource(".")
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "airflow-go-pack ./path/to/bundle",
+ "discovery failure should point the user at a package path")
+ assert.Contains(t, err.Error(), "--source")
+}
+
+func goBuild(t *testing.T, pkgDir, out, goos, goarch string) {
+ t.Helper()
+ cmd := exec.Command("go", "build", "-o", out, pkgDir)
+ cmd.Env = append(os.Environ(), "GOOS="+goos, "GOARCH="+goarch,
"CGO_ENABLED=0")
+ if combined, err := cmd.CombinedOutput(); err != nil {
+ t.Fatalf("go build %s for %s/%s failed: %v\n%s", pkgDir, goos,
goarch, err, combined)
+ }
+}
+
+// captureMetadata runs a host-runnable bundle binary with --airflow-metadata
+// and writes its JSON stdout to outPath.
+func captureMetadata(t *testing.T, hostBin, outPath string) {
+ t.Helper()
+ cmd := exec.Command(hostBin, "--airflow-metadata")
+ out, err := cmd.Output()
+ require.NoError(t, err, "running %s --airflow-metadata", hostBin)
+ require.NoError(t, os.WriteFile(outPath, out, 0o644))
+}
diff --git a/go-sdk/cmd/airflow-go-pack/pack_test.go
b/go-sdk/cmd/airflow-go-pack/pack_test.go
new file mode 100644
index 00000000000..999d5c0f017
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/pack_test.go
@@ -0,0 +1,506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "errors"
+ "io"
+ "os"
+ "path/filepath"
+ "runtime"
+ "testing"
+
+ "github.com/spf13/cobra"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+ "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+func TestRenderManifest_DeterministicDagOrdering(t *testing.T) {
+ meta := airflowmetadata.Manifest{
+ AirflowBundleMetadataVersion: "1.0",
+ SDK: airflowmetadata.SDK{
+ Language: "go",
+ Version: "0.1.0",
+ SupervisorSchemaVersion: "2026-06-16",
+ },
+ Dags: map[string]airflowmetadata.Dag{
+ "zeta_dag": {Tasks: []string{"a", "b"}},
+ "alpha_dag": {Tasks: []string{"x"}},
+ },
+ }
+
+ got1, err := renderManifest(meta, "main.go")
+ require.NoError(t, err)
+ got2, err := renderManifest(meta, "main.go")
+ require.NoError(t, err)
+
+ assert.Equal(t, got1, got2, "manifest should be byte-identical for
identical input")
+
+ expected := `airflow_bundle_metadata_version: "1.0"
+sdk:
+ language: "go"
+ version: "0.1.0"
+ supervisor_schema_version: "2026-06-16"
+source: "main.go"
+dags:
+ alpha_dag:
+ tasks:
+ - "x"
+ zeta_dag:
+ tasks:
+ - "a"
+ - "b"
+`
+ assert.Equal(t, expected, string(got1))
+}
+
+// Values (task IDs, source, SDK fields) are quoted so a scalar-looking value
+// stays a string; Dag ID keys stay plain scalars.
+func TestRenderManifest_QuotesValuesNotKeys(t *testing.T) {
+ meta := airflowmetadata.Manifest{
+ AirflowBundleMetadataVersion: "1.0",
+ SDK: airflowmetadata.SDK{
+ Language: "go",
+ Version: "0.1.0",
+ SupervisorSchemaVersion: "2026-06-16",
+ },
+ Dags: map[string]airflowmetadata.Dag{
+ "my_dag": {Tasks: []string{"123", "true"}},
+ },
+ }
+
+ got, err := renderManifest(meta, "main.go")
+ require.NoError(t, err)
+
+ // Task values that look like scalars are quoted.
+ assert.Contains(t, string(got), `- "123"`)
+ assert.Contains(t, string(got), `- "true"`)
+ // The Dag ID key is a plain scalar, not quoted.
+ assert.Contains(t, string(got), "\n my_dag:\n")
+ assert.NotContains(t, string(got), `"my_dag"`)
+}
+
+func TestRenderManifest_EmptyDags(t *testing.T) {
+ meta := airflowmetadata.Manifest{
+ AirflowBundleMetadataVersion: "1.0",
+ SDK: airflowmetadata.SDK{
+ Language: "go",
+ Version: "0.1.0",
+ SupervisorSchemaVersion: "2026-06-16",
+ },
+ Dags: map[string]airflowmetadata.Dag{},
+ }
+ got, err := renderManifest(meta, "main.go")
+ require.NoError(t, err)
+ assert.Contains(t, string(got), "dags: {}")
+}
+
+// Forwarded `go build` flags after "--" must not count against
MaximumNArgs(1).
+func TestRootArgs_AllowsBuildFlagsAfterDoubleDash(t *testing.T) {
+ cases := [][]string{
+ {"--", "-ldflags", "-X main.dagId=foo"},
+ {"./pkg", "--", "-ldflags", "-X main.dagId=foo"},
+ {"--", "-trimpath", "-tags=prod"},
+ }
+ for _, argv := range cases {
+ cmd := newRootCmd()
+ // Stop the command from actually running; we only want arg
validation.
+ cmd.RunE = func(*cobra.Command, []string) error { return nil }
+ cmd.SetArgs(argv)
+ assert.NoError(t, cmd.Execute(), "args=%v should validate",
argv)
+ }
+}
+
+// A file the OS refuses to exec is errExecNotStartable; a binary that runs and
+// exits non-zero is not.
+func TestRunIntrospect_ClassifiesExecFailure(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("exec-format semantics differ on Windows")
+ }
+ dir := t.TempDir()
+
+ // A non-binary file with the exec bit set: execve rejects it with an
+ // exec-format error, standing in for a foreign-arch executable.
+ garbage := filepath.Join(dir, "garbage")
+ require.NoError(t, os.WriteFile(garbage, []byte("not a real
executable\n"), 0o755))
+ _, err := runIntrospect(garbage, "--airflow-metadata")
+ require.Error(t, err)
+ assert.ErrorIs(t, err, errExecNotStartable)
+
+ // A script that starts and exits non-zero is a genuine run failure, not
+ // an unrunnable binary, so it must NOT be classified as not-startable.
+ failing := filepath.Join(dir, "failing")
+ require.NoError(t, os.WriteFile(failing, []byte("#!/bin/sh\nexit 3\n"),
0o755))
+ _, err = runIntrospect(failing, "--airflow-metadata")
+ require.Error(t, err)
+ assert.False(
+ t,
+ errors.Is(err, errExecNotStartable),
+ "non-zero exit should not be errExecNotStartable",
+ )
+}
+
+func TestRootArgs_RejectsExtraPositionalBeforeDash(t *testing.T) {
+ cmd := newRootCmd()
+ cmd.RunE = func(*cobra.Command, []string) error { return nil }
+ cmd.SetArgs([]string{"./pkg1", "./pkg2", "--", "-ldflags", "-X
main.dagId=foo"})
+ err := cmd.Execute()
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "accepts at most 1 arg")
+}
+
+func TestSameFile(t *testing.T) {
+ dir := t.TempDir()
+ a := filepath.Join(dir, "a")
+ b := filepath.Join(dir, "b")
+ require.NoError(t, os.WriteFile(a, []byte("a"), 0o644))
+ require.NoError(t, os.WriteFile(b, []byte("b"), 0o644))
+
+ // Distinct existing files do not alias.
+ same, err := sameFile(a, b)
+ require.NoError(t, err)
+ assert.False(t, same)
+
+ // Two spellings of the same path alias even though only one exists on
+ // disk under the literal string ("./a" vs "a" relative to the same
dir).
+ same, err = sameFile(filepath.Join(dir, "x"), filepath.Join(dir, ".",
"x"))
+ require.NoError(t, err)
+ assert.True(t, same, "cleaned-abs equality should treat ./x and x as
the same file")
+
+ // A non-existent output never aliases an existing input by inode.
+ same, err = sameFile(filepath.Join(dir, "does-not-exist"), a)
+ require.NoError(t, err)
+ assert.False(t, same)
+
+ // A symlink that points at the input shares its inode.
+ link := filepath.Join(dir, "link-to-a")
+ if err := os.Symlink(a, link); err != nil {
+ t.Skipf("symlinks unsupported: %v", err)
+ }
+ same, err = sameFile(link, a)
+ require.NoError(t, err)
+ assert.True(t, same, "a symlink to the file should alias it")
+}
+
+// When --output resolves to the same file as --executable, runPack must refuse
+// before copyFile truncates the input; the executable's bytes survive.
+func TestRunPack_RejectsOutputAliasingExecutable(t *testing.T) {
+ dir := t.TempDir()
+ exec := filepath.Join(dir, "bundle")
+ source := filepath.Join(dir, "main.go")
+ original := []byte("prebuilt-binary-bytes")
+ require.NoError(t, os.WriteFile(exec, original, 0o755))
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: exec,
+ source: source,
+ output: exec,
+ })
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "same file as the executable")
+
+ // The guard must fire before any write: the executable is intact.
+ got, readErr := os.ReadFile(exec)
+ require.NoError(t, readErr)
+ assert.Equal(t, original, got, "executable must not be truncated when
output aliases it")
+}
+
+// When --output resolves to the same file as --airflow-metadata, runPack must
+// refuse before the bundle is renamed onto it; the manifest file survives.
+func TestRunPack_RejectsOutputAliasingMetadataFile(t *testing.T) {
+ dir := t.TempDir()
+ exe := filepath.Join(dir, "foreign")
+ require.NoError(t, os.WriteFile(exe,
[]byte("foreign-arch-binary-bytes"), 0o755))
+ source := filepath.Join(dir, "main.go")
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+ meta := filepath.Join(dir, "airflow-metadata.json")
+ original := []byte(
+ `{"airflow_bundle_metadata_version":"1.0",` +
+
`"sdk":{"language":"go","version":"0.1.0","supervisor_schema_version":"2026-06-16"},`
+
+ `"dags":{"my_dag":{"tasks":["t1"]}}}`,
+ )
+ require.NoError(t, os.WriteFile(meta, original, 0o644))
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: exe,
+ source: source,
+ airflowMetadata: meta,
+ output: meta,
+ })
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "same file as the --airflow-metadata
file")
+
+ // The guard must fire before any write: the manifest file is intact.
+ got, readErr := os.ReadFile(meta)
+ require.NoError(t, readErr)
+ assert.Equal(t, original, got, "metadata file must not be clobbered
when output aliases it")
+}
+
+// When the default output path names an existing directory, the packer must
+// reject it with --output guidance, not a bare os.Rename "file exists".
+func TestRunPack_RejectsDirectoryOutput(t *testing.T) {
+ dir := t.TempDir()
+ exe := filepath.Join(dir, "prebuilt")
+ require.NoError(t, os.WriteFile(exe, []byte("prebuilt-binary-bytes"),
0o755))
+ source := filepath.Join(dir, "main.go")
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+
+ outDir := filepath.Join(dir, "bundle")
+ require.NoError(t, os.Mkdir(outDir, 0o755))
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: exe,
+ source: source,
+ output: outDir,
+ })
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "is an existing directory")
+ assert.Contains(t, err.Error(), "--output", "error must point the user
at --output")
+}
+
+// --executable and --goos/--goarch are mutually exclusive: --executable packs
+// the binary as-is and never builds, so it cannot cross-compile.
+func TestRunPack_RejectsExecutableWithCrossFlags(t *testing.T) {
+ for _, tc := range []struct {
+ name string
+ opts packOptions
+ }{
+ {name: "goos", opts: packOptions{executable: "bin", goos:
"linux"}},
+ {name: "goarch", opts: packOptions{executable: "bin", goarch:
"amd64"}},
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ err := runPack(io.Discard, io.Discard, &tc.opts)
+ require.Error(t, err)
+ assert.Contains(
+ t,
+ err.Error(),
+ "--executable is mutually exclusive with
--goos/--goarch",
+ )
+ })
+ }
+}
+
+// A foreign-arch --executable that cannot be exec'd on the host is a hard
error
+// with remediation guidance, never a silent host rebuild that could describe a
+// different DAG/task set than the shipped binary.
+func TestRunPack_FailsFastWhenExecutableUnrunnableAndNoMetadata(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("exec-format semantics differ on Windows")
+ }
+ dir := t.TempDir()
+ // A non-binary file with the exec bit set stands in for a foreign-arch
+ // executable: execve rejects it with an exec-format error.
+ exe := filepath.Join(dir, "foreign")
+ require.NoError(t, os.WriteFile(exe, []byte("not a runnable binary\n"),
0o755))
+ source := filepath.Join(dir, "main.go")
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+ out := filepath.Join(dir, "bundle")
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: exe,
+ source: source,
+ output: out,
+ })
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "cannot exec --executable")
+ assert.Contains(t, err.Error(), "--goarch", "error must point at the
cross-build workflow")
+ assert.Contains(
+ t,
+ err.Error(),
+ "--airflow-metadata",
+ "error must mention the --airflow-metadata escape hatch",
+ )
+
+ _, statErr := os.Stat(out)
+ assert.True(t, os.IsNotExist(statErr), "no bundle should be written on
a fail-fast")
+}
+
+// --airflow-metadata short-circuits introspection: a binary that cannot run on
+// the host is packed using the supplied JSON manifest, and its bytes survive.
+func TestRunPack_UsesMetadataFile(t *testing.T) {
+ dir := t.TempDir()
+ exe := filepath.Join(dir, "foreign")
+ exeBytes := []byte("foreign-arch-binary-bytes")
+ require.NoError(t, os.WriteFile(exe, exeBytes, 0o755))
+ source := filepath.Join(dir, "main.go")
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+ meta := filepath.Join(dir, "airflow-metadata.json")
+ require.NoError(t, os.WriteFile(meta, []byte(
+ `{"airflow_bundle_metadata_version":"1.0",`+
+
`"sdk":{"language":"go","version":"0.1.0","supervisor_schema_version":"2026-06-16"},`+
+ `"dags":{"my_dag":{"tasks":["t1"]}}}`,
+ ), 0o644))
+ out := filepath.Join(dir, "bundle")
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: exe,
+ source: source,
+ airflowMetadata: meta,
+ output: out,
+ })
+ require.NoError(t, err)
+
+ gotSource, gotMeta, err := bundlefooter.Read(out)
+ require.NoError(t, err)
+ srcBytes, err := os.ReadFile(source)
+ require.NoError(t, err)
+ assert.Equal(t, srcBytes, gotSource)
+ assert.Contains(
+ t,
+ string(gotMeta),
+ "my_dag:",
+ "Dag from --airflow-metadata must appear in the manifest",
+ )
+
+ bundleBytes, err := os.ReadFile(out)
+ require.NoError(t, err)
+ binaryRegion :=
bundleBytes[:len(bundleBytes)-len(gotSource)-len(gotMeta)-bundlefooter.TrailerSize]
+ assert.Equal(t, exeBytes, binaryRegion, "the supplied --executable must
be packed verbatim")
+}
+
+// --airflow-metadata also accepts a YAML manifest, not only the JSON the
+// binary prints.
+func TestRunPack_AcceptsYAMLMetadataFile(t *testing.T) {
+ dir := t.TempDir()
+ exe := filepath.Join(dir, "foreign")
+ require.NoError(t, os.WriteFile(exe,
[]byte("foreign-arch-binary-bytes"), 0o755))
+ source := filepath.Join(dir, "main.go")
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+ meta := filepath.Join(dir, "airflow-metadata.yaml")
+ require.NoError(t, os.WriteFile(meta, []byte(
+ "airflow_bundle_metadata_version: \"1.0\"\n"+
+ "sdk:\n"+
+ " language: \"go\"\n"+
+ " version: \"0.1.0\"\n"+
+ " supervisor_schema_version: \"2026-06-16\"\n"+
+ "dags:\n"+
+ " yaml_dag:\n"+
+ " tasks:\n"+
+ " - \"t1\"\n",
+ ), 0o644))
+ out := filepath.Join(dir, "bundle")
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: exe,
+ source: source,
+ airflowMetadata: meta,
+ output: out,
+ })
+ require.NoError(t, err)
+
+ _, gotMeta, err := bundlefooter.Read(out)
+ require.NoError(t, err)
+ assert.Contains(
+ t,
+ string(gotMeta),
+ "yaml_dag:",
+ "Dag from a YAML --airflow-metadata file must appear in the
manifest",
+ )
+}
+
+// With no explicit --output, packing "./bundle" from a package dir named
+// "bundle" derives a default output that collides with the pre-built binary.
+func TestRunPack_RejectsDefaultOutputAliasingExecutable(t *testing.T) {
+ parent := t.TempDir()
+ pkgDir := filepath.Join(parent, "bundle")
+ require.NoError(t, os.Mkdir(pkgDir, 0o755))
+ exec := filepath.Join(pkgDir, "bundle")
+ source := filepath.Join(pkgDir, "main.go")
+ original := []byte("prebuilt-binary-bytes")
+ require.NoError(t, os.WriteFile(exec, original, 0o755))
+ require.NoError(t, os.WriteFile(source, []byte("package main\nfunc
main() {}\n"), 0o644))
+
+ // defaultOutputPath derives the bundle name from the package dir base
+ // ("bundle") relative to cwd, so run from pkgDir to reproduce the
collision.
+ t.Chdir(pkgDir)
+
+ err := runPack(io.Discard, io.Discard, &packOptions{
+ executable: "./bundle",
+ source: "main.go",
+ })
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "same file as the executable")
+
+ got, readErr := os.ReadFile(exec)
+ require.NoError(t, readErr)
+ assert.Equal(
+ t,
+ original,
+ got,
+ "executable must not be truncated by the default output
collision",
+ )
+}
+
+// --goos/--goarch > env > host precedence. The flags let `go tool
+// airflow-go-pack` cross-compile without GOOS/GOARCH in the env (which would
+// cross-build the packer itself).
+func TestTargetPlatform(t *testing.T) {
+ // An empty value reads like an unset var to os.Getenv, so this clears
any
+ // ambient cross-compile setting for the default case.
+ t.Setenv("GOOS", "")
+ t.Setenv("GOARCH", "")
+
+ t.Run("defaults to host", func(t *testing.T) {
+ goos, goarch := targetPlatform(&packOptions{})
+ assert.Equal(t, runtime.GOOS, goos)
+ assert.Equal(t, runtime.GOARCH, goarch)
+ })
+
+ t.Run("env overrides host", func(t *testing.T) {
+ t.Setenv("GOOS", "linux")
+ t.Setenv("GOARCH", "arm64")
+ goos, goarch := targetPlatform(&packOptions{})
+ assert.Equal(t, "linux", goos)
+ assert.Equal(t, "arm64", goarch)
+ })
+
+ t.Run("flags override env", func(t *testing.T) {
+ t.Setenv("GOOS", "linux")
+ t.Setenv("GOARCH", "arm64")
+ goos, goarch := targetPlatform(&packOptions{goos: "windows",
goarch: "amd64"})
+ assert.Equal(t, "windows", goos)
+ assert.Equal(t, "amd64", goarch)
+ })
+
+ t.Run("resolves each axis independently", func(t *testing.T) {
+ t.Setenv("GOOS", "")
+ t.Setenv("GOARCH", "arm64")
+ goos, goarch := targetPlatform(&packOptions{goos: "windows"})
+ assert.Equal(t, "windows", goos, "goos from flag")
+ assert.Equal(t, "arm64", goarch, "goarch from env")
+ })
+}
+
+// When the --output parent directory does not exist, the packer must create it
+// instead of failing with an opaque temp-file error.
+func TestWriteBundle_CreatesMissingOutputDir(t *testing.T) {
+ dir := t.TempDir()
+ exe := filepath.Join(dir, "input-bin")
+ require.NoError(t, os.WriteFile(exe, []byte("binary-bytes"), 0o755))
+
+ output := filepath.Join(dir, "bin", "nested", "bundle")
+ require.NoError(t, writeBundle(exe, output, []byte("source"),
[]byte("manifest")))
+
+ info, err := os.Stat(output)
+ require.NoError(t, err, "bundle should be written into the created
directory")
+ assert.False(t, info.IsDir())
+}
diff --git a/go-sdk/example/bundle/Justfile b/go-sdk/example/bundle/Justfile
index ca211b304b1..5432aa34a9d 100644
--- a/go-sdk/example/bundle/Justfile
+++ b/go-sdk/example/bundle/Justfile
@@ -21,13 +21,32 @@
default:
@just --list
-# Build the example bundle
+# Build the example bundle (raw go build, no footer; for Edge Worker testing)
build:
@echo "Building example DAG bundle..."
go build -o ../../bin/example-dag-bundle .
-
-# Build with specific name and version
+# Build with specific name and version (raw go build, no footer)
build-with name="data_processing_example" version="1.0.0":
@echo "Building example DAG bundle with name={{name}}
version={{version}}..."
go build -ldflags="-X 'main.bundleName={{name}}' -X
'main.bundleVersion={{version}}'" -o ../../bin/{{name}}-{{version}} .
+
+# One-step build + pack. The single `go tool airflow-go-pack`
+# invocation runs `go build` internally, queries the binary for its
+# DAG/task identity via --airflow-metadata, and appends the source plus
+# airflow-metadata.yaml plus AFBNDL01 trailer. The output is a single
+# self-contained executable bundle, named after the bundle's package
+# directory and written to the current directory. Drop it into
+# [executable] bundles_folder to deploy.
+pack:
+ @echo "Packing example DAG bundle..."
+ go tool airflow-go-pack --output ../../bin/example_dags
+
+# Pack with extra go build flags forwarded after "--".
+pack-release:
+ @echo "Packing example DAG bundle (release flags)..."
+ go tool airflow-go-pack --output ../../bin/example_dags -- -trimpath
-ldflags="-s -w"
+
+# Inspect a packed bundle's embedded manifest.
+inspect bundle="../../bin/example_dags":
+ go tool airflow-go-pack inspect {{bundle}}
diff --git a/go-sdk/go.mod b/go-sdk/go.mod
index f3bfcd4b0f6..cc738e4c3f8 100644
--- a/go-sdk/go.mod
+++ b/go-sdk/go.mod
@@ -4,6 +4,8 @@ go 1.24.0
toolchain go1.24.6
+tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack
+
require (
github.com/cappuccinotm/slogx v1.4.2
github.com/golang-jwt/jwt/v5 v5.3.0
@@ -60,5 +62,5 @@ require (
github.com/samber/slog-http v1.8.2
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
- gopkg.in/yaml.v3 v3.0.1 // indirect
+ gopkg.in/yaml.v3 v3.0.1
)
diff --git a/go-sdk/internal/airflowmetadata/airflowmetadata.go
b/go-sdk/internal/airflowmetadata/airflowmetadata.go
new file mode 100644
index 00000000000..d11412d722c
--- /dev/null
+++ b/go-sdk/internal/airflowmetadata/airflowmetadata.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package airflowmetadata defines the airflow-metadata manifest wire shape
+// shared between the producer (a bundle binary's --airflow-metadata flag,
+// emitted from pkg/execution) and the consumer (airflow-go-pack, which decodes
+// it and renders the embedded airflow-metadata.yaml). Keeping the definition
in
+// one place stops the two sides from drifting. The canonical schema is
+// airflow-metadata.schema.json in the Task SDK docs.
+package airflowmetadata
+
+// FormatVersion is the bundle-spec version emitted manifests conform to.
+const FormatVersion = "1.0"
+
+// Manifest is the shape printed by a bundle binary's --airflow-metadata flag
+// (YAML by default, JSON under --format json). It mirrors
+// airflow-metadata.schema.json minus the source field, which only the packer
+// can resolve from the build inputs. The yaml tags let the packer's
+// --airflow-metadata flag decode a captured manifest in either JSON or YAML
+// (the airflow-metadata.yaml in a bundle).
+type Manifest struct {
+ AirflowBundleMetadataVersion string
`json:"airflow_bundle_metadata_version" yaml:"airflow_bundle_metadata_version"`
+ SDK SDK `json:"sdk"
yaml:"sdk"`
+ Dags map[string]Dag `json:"dags"
yaml:"dags"`
+}
+
+// SDK identifies the SDK that produced the bundle.
+type SDK struct {
+ Language string `json:"language"
yaml:"language"`
+ Version string `json:"version"
yaml:"version"`
+ SupervisorSchemaVersion string `json:"supervisor_schema_version"
yaml:"supervisor_schema_version"`
+}
+
+// Dag is the static description of a single DAG declared in the bundle.
+type Dag struct {
+ Tasks []string `json:"tasks" yaml:"tasks"`
+}
diff --git a/go-sdk/internal/bundlefooter/footer.go
b/go-sdk/internal/bundlefooter/footer.go
new file mode 100644
index 00000000000..6641ab3af08
--- /dev/null
+++ b/go-sdk/internal/bundlefooter/footer.go
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package bundlefooter implements the AFBNDL01 trailer described in
+// ADR 0004 (and task-sdk/docs/executable-bundle-spec.rst). A bundle file is
+// the compiled executable with three appended regions: the source bytes, the
+// manifest bytes, and a fixed 64-byte trailer that locates them and carries a
+// SHA-256 over the binary region for integrity.
+//
+// The trailer layout (all little-endian) is:
+//
+// bytes 0..3 source_len uint32
+// bytes 4..7 metadata_len uint32
+// bytes 8..11 footer_ver uint32 (= 1)
+// bytes 12..43 binary_sha256 32 bytes (SHA-256 of the binary region)
+// bytes 44..55 reserved 12 bytes, zero
+// bytes 56..63 magic 8 bytes ASCII "AFBNDL01"
+package bundlefooter
+
+import (
+ "crypto/sha256"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "os"
+)
+
+const (
+ // TrailerSize is the fixed length of the trailer, in bytes.
+ TrailerSize = 64
+
+ // FooterVersion is the currently defined trailer-format version.
+ FooterVersion = 1
+
+ // MaxRegionSize is the largest source or metadata region this footer
+ // format can address (uint32 length field).
+ MaxRegionSize = math.MaxUint32
+)
+
+// Magic is the 8-byte ASCII tag that identifies a file as a bundle.
+var Magic = [8]byte{'A', 'F', 'B', 'N', 'D', 'L', '0', '1'}
+
+// ErrNotBundle is returned by Read when the file does not end with the
+// AFBNDL01 magic.
+var ErrNotBundle = errors.New("bundlefooter: not a bundle (magic mismatch)")
+
+// ErrUnknownVersion is returned by Read when the trailer's footer_ver field
+// is something other than FooterVersion.
+var ErrUnknownVersion = errors.New("bundlefooter: unknown footer version")
+
+// ErrHashMismatch is returned by Read when the SHA-256 recomputed over the
+// binary region does not match the binary_sha256 recorded in the trailer.
+var ErrHashMismatch = errors.New("bundlefooter: binary_sha256 mismatch")
+
+// Trailer carries the parsed contents of a bundle's 64-byte trailer.
+type Trailer struct {
+ SourceLen uint32
+ MetadataLen uint32
+ FooterVersion uint32
+ BinarySHA256 [sha256.Size]byte
+}
+
+// Append writes the source bytes, metadata bytes, and trailer to the end of
+// the file at execPath. The file's existing contents (the executable) are
+// left intact and its mode bits are preserved. source MAY be nil/empty.
+func Append(execPath string, source, metadata []byte) error {
+ if int64(len(source)) > MaxRegionSize {
+ return fmt.Errorf(
+ "bundlefooter: source region too large (%d bytes, max
%d)",
+ len(source),
+ MaxRegionSize,
+ )
+ }
+ if int64(len(metadata)) > MaxRegionSize {
+ return fmt.Errorf(
+ "bundlefooter: metadata region too large (%d bytes, max
%d)",
+ len(metadata),
+ MaxRegionSize,
+ )
+ }
+
+ // Compute binary_sha256 over the file as it stands now: at this point
the
+ // whole file is the binary region, so the digest matches what a reader
+ // recomputes over [0, source_start) after the append.
+ binaryHash, err := hashFile(execPath)
+ if err != nil {
+ return fmt.Errorf("bundlefooter: hashing binary region of %s:
%w", execPath, err)
+ }
+
+ f, err := os.OpenFile(execPath, os.O_RDWR|os.O_APPEND, 0)
+ if err != nil {
+ return fmt.Errorf("bundlefooter: opening %s: %w", execPath, err)
+ }
+ defer f.Close()
+
+ if len(source) > 0 {
+ if _, err := f.Write(source); err != nil {
+ return fmt.Errorf("bundlefooter: writing source region:
%w", err)
+ }
+ }
+ if len(metadata) > 0 {
+ if _, err := f.Write(metadata); err != nil {
+ return fmt.Errorf("bundlefooter: writing metadata
region: %w", err)
+ }
+ }
+
+ trailer := encodeTrailer(uint32(len(source)), uint32(len(metadata)),
binaryHash)
+ if _, err := f.Write(trailer[:]); err != nil {
+ return fmt.Errorf("bundlefooter: writing trailer: %w", err)
+ }
+ return nil
+}
+
+// Read parses the trailer of the file at path and returns the embedded
+// source and metadata regions. Returns ErrNotBundle if the magic does not
+// match (so callers may silently ignore non-bundle files).
+func Read(path string) (source, metadata []byte, err error) {
+ f, err := os.Open(path)
+ if err != nil {
+ return nil, nil, fmt.Errorf("bundlefooter: opening %s: %w",
path, err)
+ }
+ defer f.Close()
+
+ stat, err := f.Stat()
+ if err != nil {
+ return nil, nil, fmt.Errorf("bundlefooter: stat %s: %w", path,
err)
+ }
+ size := stat.Size()
+ if size < TrailerSize {
+ return nil, nil, ErrNotBundle
+ }
+
+ var trailer [TrailerSize]byte
+ if _, err := f.ReadAt(trailer[:], size-TrailerSize); err != nil {
+ return nil, nil, fmt.Errorf("bundlefooter: reading trailer:
%w", err)
+ }
+
+ t, err := decodeTrailer(trailer)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ metadataStart := size - TrailerSize - int64(t.MetadataLen)
+ sourceStart := metadataStart - int64(t.SourceLen)
+ if sourceStart < 0 {
+ return nil, nil, fmt.Errorf(
+ "bundlefooter: trailer reports regions larger than file
(source_len=%d metadata_len=%d size=%d)",
+ t.SourceLen,
+ t.MetadataLen,
+ size,
+ )
+ }
+ if sourceStart == 0 {
+ return nil, nil, fmt.Errorf("bundlefooter: empty binary region")
+ }
+
+ binaryHash, err := hashRegion(f, sourceStart)
+ if err != nil {
+ return nil, nil, fmt.Errorf("bundlefooter: hashing binary
region of %s: %w", path, err)
+ }
+ if binaryHash != t.BinarySHA256 {
+ return nil, nil, fmt.Errorf("%w: %s", ErrHashMismatch, path)
+ }
+
+ if t.SourceLen > 0 {
+ source = make([]byte, t.SourceLen)
+ if _, err := f.ReadAt(source, sourceStart); err != nil &&
!errors.Is(err, io.EOF) {
+ return nil, nil, fmt.Errorf("bundlefooter: reading
source region: %w", err)
+ }
+ }
+ if t.MetadataLen > 0 {
+ metadata = make([]byte, t.MetadataLen)
+ if _, err := f.ReadAt(metadata, metadataStart); err != nil &&
!errors.Is(err, io.EOF) {
+ return nil, nil, fmt.Errorf("bundlefooter: reading
metadata region: %w", err)
+ }
+ }
+ return source, metadata, nil
+}
+
+// IsBundle reports whether the file at path ends with the AFBNDL01 magic.
+// It does not validate the trailer beyond the magic check, so a file with a
+// matching magic but a corrupt trailer body still returns true.
+func IsBundle(path string) (bool, error) {
+ f, err := os.Open(path)
+ if err != nil {
+ return false, err
+ }
+ defer f.Close()
+
+ stat, err := f.Stat()
+ if err != nil {
+ return false, err
+ }
+ if stat.Size() < TrailerSize {
+ return false, nil
+ }
+
+ var tail [8]byte
+ if _, err := f.ReadAt(tail[:], stat.Size()-int64(len(tail))); err !=
nil {
+ return false, err
+ }
+ return tail == Magic, nil
+}
+
+func encodeTrailer(sourceLen, metadataLen uint32, binaryHash
[sha256.Size]byte) [TrailerSize]byte {
+ var t [TrailerSize]byte
+ binary.LittleEndian.PutUint32(t[0:4], sourceLen)
+ binary.LittleEndian.PutUint32(t[4:8], metadataLen)
+ binary.LittleEndian.PutUint32(t[8:12], FooterVersion)
+ copy(t[12:44], binaryHash[:])
+ // bytes 44..55 are reserved, zero
+ copy(t[56:64], Magic[:])
+ return t
+}
+
+func decodeTrailer(b [TrailerSize]byte) (Trailer, error) {
+ var magic [8]byte
+ copy(magic[:], b[56:64])
+ if magic != Magic {
+ return Trailer{}, ErrNotBundle
+ }
+ t := Trailer{
+ SourceLen: binary.LittleEndian.Uint32(b[0:4]),
+ MetadataLen: binary.LittleEndian.Uint32(b[4:8]),
+ FooterVersion: binary.LittleEndian.Uint32(b[8:12]),
+ }
+ copy(t.BinarySHA256[:], b[12:44])
+ if t.FooterVersion != FooterVersion {
+ return Trailer{}, fmt.Errorf("%w: %d", ErrUnknownVersion,
t.FooterVersion)
+ }
+ return t, nil
+}
+
+// hashFile computes SHA-256 over the entire contents of the file at path.
+func hashFile(path string) ([sha256.Size]byte, error) {
+ var sum [sha256.Size]byte
+ f, err := os.Open(path)
+ if err != nil {
+ return sum, err
+ }
+ defer f.Close()
+ h := sha256.New()
+ if _, err := io.Copy(h, f); err != nil {
+ return sum, err
+ }
+ copy(sum[:], h.Sum(nil))
+ return sum, nil
+}
+
+// hashRegion computes SHA-256 over the first length bytes of f, seeking to the
+// start first. It is used to recompute binary_sha256 over the binary region
+// [0, source_start). It does not disturb subsequent ReadAt calls.
+func hashRegion(f *os.File, length int64) ([sha256.Size]byte, error) {
+ var sum [sha256.Size]byte
+ if _, err := f.Seek(0, io.SeekStart); err != nil {
+ return sum, err
+ }
+ h := sha256.New()
+ if _, err := io.CopyN(h, f, length); err != nil {
+ return sum, err
+ }
+ copy(sum[:], h.Sum(nil))
+ return sum, nil
+}
diff --git a/go-sdk/internal/bundlefooter/footer_test.go
b/go-sdk/internal/bundlefooter/footer_test.go
new file mode 100644
index 00000000000..33136017455
--- /dev/null
+++ b/go-sdk/internal/bundlefooter/footer_test.go
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bundlefooter
+
+import (
+ "errors"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func writeTempBinary(t *testing.T, contents []byte) string {
+ t.Helper()
+ dir := t.TempDir()
+ path := filepath.Join(dir, "fake-binary")
+ require.NoError(t, os.WriteFile(path, contents, 0o755))
+ return path
+}
+
+func TestAppendAndRead_RoundTrip(t *testing.T) {
+ binary := []byte("\x7FELFnot-really-an-elf-but-good-enough")
+ source := []byte("package main\n\nfunc main() {}\n")
+ metadata := []byte("airflow_bundle_metadata_version: \"1.0\"\nsdk:\n
language: go\n")
+
+ path := writeTempBinary(t, binary)
+ require.NoError(t, Append(path, source, metadata))
+
+ got := mustRead(t, path)
+ assert.Equal(t, len(binary)+len(source)+len(metadata)+TrailerSize,
got.size)
+
+ gotSource, gotMetadata, err := Read(path)
+ require.NoError(t, err)
+ assert.Equal(t, source, gotSource)
+ assert.Equal(t, metadata, gotMetadata)
+
+ ok, err := IsBundle(path)
+ require.NoError(t, err)
+ assert.True(t, ok)
+}
+
+func TestAppend_ZeroLengthSource(t *testing.T) {
+ binary := []byte("\x7FELFstub")
+ metadata := []byte("manifest")
+
+ path := writeTempBinary(t, binary)
+ require.NoError(t, Append(path, nil, metadata))
+
+ source, gotMetadata, err := Read(path)
+ require.NoError(t, err)
+ assert.Empty(t, source)
+ assert.Equal(t, metadata, gotMetadata)
+}
+
+func TestAppend_DeterministicOutput(t *testing.T) {
+ binary := []byte("\x7FELFstub-binary-bytes")
+ source := []byte("source")
+ metadata := []byte("manifest")
+
+ pathA := writeTempBinary(t, binary)
+ pathB := writeTempBinary(t, binary)
+ require.NoError(t, Append(pathA, source, metadata))
+ require.NoError(t, Append(pathB, source, metadata))
+
+ a, err := os.ReadFile(pathA)
+ require.NoError(t, err)
+ b, err := os.ReadFile(pathB)
+ require.NoError(t, err)
+ assert.Equal(t, a, b)
+}
+
+func TestRead_NotBundle(t *testing.T) {
+ path := writeTempBinary(t, []byte("just a regular file with no footer"))
+
+ _, _, err := Read(path)
+ require.ErrorIs(t, err, ErrNotBundle)
+
+ ok, err := IsBundle(path)
+ require.NoError(t, err)
+ assert.False(t, ok)
+}
+
+func TestRead_TooShort(t *testing.T) {
+ path := writeTempBinary(t, []byte("hi"))
+
+ _, _, err := Read(path)
+ require.ErrorIs(t, err, ErrNotBundle)
+}
+
+func TestRead_UnknownVersion(t *testing.T) {
+ binary := []byte("\x7FELFstub")
+ source := []byte("src")
+ metadata := []byte("md")
+ path := writeTempBinary(t, binary)
+ require.NoError(t, Append(path, source, metadata))
+
+ // Mutate the version byte in the trailer.
+ f, err := os.OpenFile(path, os.O_RDWR, 0)
+ require.NoError(t, err)
+ stat, err := f.Stat()
+ require.NoError(t, err)
+ // footer_ver lives at bytes 8..11 of the trailer.
+ versionOffset := stat.Size() - TrailerSize + 8
+ _, err = f.WriteAt([]byte{99, 0, 0, 0}, versionOffset)
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ _, _, err = Read(path)
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, ErrUnknownVersion))
+}
+
+func TestRead_HashMismatch(t *testing.T) {
+ binary := []byte("\x7FELFstub-binary-bytes")
+ source := []byte("src")
+ metadata := []byte("md")
+ path := writeTempBinary(t, binary)
+ require.NoError(t, Append(path, source, metadata))
+
+ // Corrupt a byte inside the binary region; the trailer's binary_sha256
+ // no longer matches what Read recomputes over [0, source_start).
+ f, err := os.OpenFile(path, os.O_RDWR, 0)
+ require.NoError(t, err)
+ _, err = f.WriteAt([]byte{'X'}, 0)
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ _, _, err = Read(path)
+ require.ErrorIs(t, err, ErrHashMismatch)
+}
+
+type bundleStat struct {
+ size int
+}
+
+func mustRead(t *testing.T, path string) bundleStat {
+ t.Helper()
+ stat, err := os.Stat(path)
+ require.NoError(t, err)
+ return bundleStat{size: int(stat.Size())}
+}
diff --git a/go-sdk/pkg/execution/messages.go b/go-sdk/pkg/execution/messages.go
index 9f9e14505cd..b41a2e9adf6 100644
--- a/go-sdk/pkg/execution/messages.go
+++ b/go-sdk/pkg/execution/messages.go
@@ -22,6 +22,13 @@ import (
"time"
)
+// SupervisorSchemaVersion is the dated AIP-72 supervisor wire-schema version
+// this SDK's coordinator protocol is compiled against, in YYYY-MM-DD form. It
+// is reported in a bundle's airflow-metadata manifest as
+// sdk.supervisor_schema_version so the supervisor can downgrade outbound
+// messages / upgrade inbound messages to a shape the bundle understands.
+const SupervisorSchemaVersion = "2026-06-16"
+
// Inbound messages (Supervisor -> Runtime).
// TaskInstanceInfo holds task instance details from StartupDetails.
diff --git a/go-sdk/pkg/execution/metadata.go b/go-sdk/pkg/execution/metadata.go
new file mode 100644
index 00000000000..8c95708b8c9
--- /dev/null
+++ b/go-sdk/pkg/execution/metadata.go
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "runtime/debug"
+
+ "gopkg.in/yaml.v3"
+
+ "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+)
+
+// sdkModulePath is the import path of the SDK module. Used to identify the
+// SDK version from the bundle binary's build info dependencies.
+const sdkModulePath = "github.com/apache/airflow/go-sdk"
+
+// MetadataFormat selects the encoding DumpAirflowMetadata writes to stdout for
+// the bundle binary's --airflow-metadata flag.
+type MetadataFormat string
+
+const (
+ // MetadataFormatYAML is the default; it matches the
airflow-metadata.yaml a
+ // bundle embeds, so `mybundle --airflow-metadata >
airflow-metadata.yaml`
+ // yields a ready-to-use file.
+ MetadataFormatYAML MetadataFormat = "yaml"
+ // MetadataFormatJSON is opt-in via --format json.
+ MetadataFormatJSON MetadataFormat = "json"
+)
+
+// ParseMetadataFormat validates a --format value and returns the matching
+// MetadataFormat. An empty value defaults to YAML.
+func ParseMetadataFormat(s string) (MetadataFormat, error) {
+ switch MetadataFormat(s) {
+ case "", MetadataFormatYAML:
+ return MetadataFormatYAML, nil
+ case MetadataFormatJSON:
+ return MetadataFormatJSON, nil
+ default:
+ return "", fmt.Errorf(
+ "unsupported --airflow-metadata format %q: want %q or
%q",
+ s, MetadataFormatYAML, MetadataFormatJSON,
+ )
+ }
+}
+
+// DumpAirflowMetadata writes the bundle's airflow-metadata manifest to stdout
+// (YAML by default, JSON when format is MetadataFormatJSON). It runs
+// RegisterDags against an in-memory recorder only — no gRPC server, no
external
+// services. airflow-go-pack execs the binary with --airflow-metadata and
+// decodes this output to build the embedded manifest.
+func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format
MetadataFormat) error {
+ meta, err := collectManifest(bundle)
+ if err != nil {
+ return err
+ }
+ data, err := encodeManifest(meta, format)
+ if err != nil {
+ return err
+ }
+ _, err = os.Stdout.Write(data)
+ return err
+}
+
+// collectManifest builds the manifest by running RegisterDags against an
+// in-memory recorder and enumerating the recorded dags and tasks.
+func collectManifest(bundle bundlev1.BundleProvider)
(airflowmetadata.Manifest, error) {
+ reg := bundlev1.New()
+ if err := bundle.RegisterDags(reg); err != nil {
+ return airflowmetadata.Manifest{}, fmt.Errorf("registering
dags: %w", err)
+ }
+
+ enum, ok := reg.(bundlev1.EnumerableBundle)
+ if !ok {
+ return airflowmetadata.Manifest{}, fmt.Errorf(
+ "registry does not implement EnumerableBundle",
+ )
+ }
+
+ meta := airflowmetadata.Manifest{
+ AirflowBundleMetadataVersion: airflowmetadata.FormatVersion,
+ SDK: airflowmetadata.SDK{
+ Language: "go",
+ Version: sdkVersion(),
+ SupervisorSchemaVersion: SupervisorSchemaVersion,
+ },
+ Dags: make(map[string]airflowmetadata.Dag),
+ }
+ for _, dag := range enum.OrderedDags() {
+ taskIDs := make([]string, 0, len(dag.Tasks))
+ for _, t := range dag.Tasks {
+ taskIDs = append(taskIDs, t.ID)
+ }
+ meta.Dags[dag.DagID] = airflowmetadata.Dag{Tasks: taskIDs}
+ }
+ return meta, nil
+}
+
+// encodeManifest renders the manifest, ensuring exactly one trailing newline
+// (yaml.Marshal adds one; JSON does not).
+func encodeManifest(meta airflowmetadata.Manifest, format MetadataFormat)
([]byte, error) {
+ switch format {
+ case MetadataFormatYAML, "":
+ return yaml.Marshal(meta)
+ case MetadataFormatJSON:
+ data, err := json.MarshalIndent(meta, "", " ")
+ if err != nil {
+ return nil, err
+ }
+ return append(data, '\n'), nil
+ default:
+ return nil, fmt.Errorf("unsupported airflow-metadata format
%q", format)
+ }
+}
+
+// sdkVersion returns the version of the SDK module linked into this binary,
+// derived from runtime/debug.ReadBuildInfo. Falls back to "(devel)" when
+// build info is unavailable (e.g. tests, bundle binaries built from a local
+// replace directive).
+func sdkVersion() string {
+ info, ok := debug.ReadBuildInfo()
+ if !ok {
+ return "(devel)"
+ }
+ if info.Main.Path == sdkModulePath && info.Main.Version != "" {
+ return info.Main.Version
+ }
+ for _, dep := range info.Deps {
+ if dep.Path == sdkModulePath {
+ if dep.Replace != nil && dep.Replace.Version != "" {
+ return dep.Replace.Version
+ }
+ if dep.Version != "" {
+ return dep.Version
+ }
+ }
+ }
+ return "(devel)"
+}
diff --git a/go-sdk/pkg/execution/metadata_test.go
b/go-sdk/pkg/execution/metadata_test.go
new file mode 100644
index 00000000000..d105b24d898
--- /dev/null
+++ b/go-sdk/pkg/execution/metadata_test.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v3"
+
+ "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+)
+
+func sampleManifest() airflowmetadata.Manifest {
+ return airflowmetadata.Manifest{
+ // "1.0" and a task named "123" must survive a YAML round-trip
as strings.
+ AirflowBundleMetadataVersion: "1.0",
+ SDK: airflowmetadata.SDK{
+ Language: "go",
+ Version: "(devel)",
+ SupervisorSchemaVersion: "2026-06-16",
+ },
+ Dags: map[string]airflowmetadata.Dag{
+ "simple_dag": {Tasks: []string{"extract", "transform",
"load"}},
+ "odd_dag": {Tasks: []string{"123"}},
+ },
+ }
+}
+
+func TestParseMetadataFormat(t *testing.T) {
+ tests := []struct {
+ in string
+ want MetadataFormat
+ wantErr bool
+ }{
+ {in: "", want: MetadataFormatYAML},
+ {in: "yaml", want: MetadataFormatYAML},
+ {in: "json", want: MetadataFormatJSON},
+ {in: "YAML", wantErr: true},
+ {in: "yml", wantErr: true},
+ {in: "xml", wantErr: true},
+ }
+ for _, tc := range tests {
+ t.Run(tc.in, func(t *testing.T) {
+ got, err := ParseMetadataFormat(tc.in)
+ if tc.wantErr {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+ assert.Equal(t, tc.want, got)
+ })
+ }
+}
+
+func TestEncodeManifest_JSON(t *testing.T) {
+ meta := sampleManifest()
+ data, err := encodeManifest(meta, MetadataFormatJSON)
+ require.NoError(t, err)
+
+ assert.Equal(t, byte('{'), data[0], "JSON output must start with an
object")
+ assert.Equal(t, byte('\n'), data[len(data)-1], "output must end with a
newline")
+ assert.NotEqual(
+ t,
+ byte('\n'),
+ data[len(data)-2],
+ "output must have exactly one trailing newline",
+ )
+ assert.Contains(t, string(data), "\n \"sdk\"", "JSON must be
indented four spaces")
+
+ var got airflowmetadata.Manifest
+ require.NoError(t, json.Unmarshal(data, &got))
+ assert.Equal(t, meta, got, "JSON must round-trip back to the manifest")
+}
+
+func TestEncodeManifest_YAML(t *testing.T) {
+ meta := sampleManifest()
+ // The empty format is the default and must encode as YAML.
+ for _, format := range []MetadataFormat{MetadataFormatYAML, ""} {
+ data, err := encodeManifest(meta, format)
+ require.NoError(t, err)
+
+ assert.NotEqual(t, byte('{'), data[0], "YAML output must not be
JSON")
+ assert.Equal(t, byte('\n'), data[len(data)-1], "output must end
with a newline")
+ assert.NotEqual(
+ t,
+ byte('\n'),
+ data[len(data)-2],
+ "output must have exactly one trailing newline",
+ )
+ assert.Contains(t, string(data),
"airflow_bundle_metadata_version:")
+
+ var got airflowmetadata.Manifest
+ require.NoError(t, yaml.Unmarshal(data, &got))
+ assert.Equal(t, meta, got, "YAML must round-trip back to the
manifest")
+ }
+}
+
+func TestEncodeManifest_UnsupportedFormat(t *testing.T) {
+ _, err := encodeManifest(sampleManifest(), MetadataFormat("xml"))
+ require.Error(t, err)
+}