This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 03d26f86c5b Go SDK: airflow-go-pack tool for self-contained bundle 
binaries (#67156)
03d26f86c5b is described below

commit 03d26f86c5b1f5a6435c3b43c59168cc5ade4d4e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Jun 8 12:06:55 2026 +0800

    Go SDK: airflow-go-pack tool for self-contained bundle binaries (#67156)
    
    * Go-SDK: Add EnumerableBundle/OrderedDags registry enumeration
    
    Record dag and task identity in registration order and expose it via a
    new EnumerableBundle interface (OrderedDags). This lets tooling read a
    bundle's dag/task ids without executing any task. AddDag/AddTask
    signatures are unchanged.
    
    * Add airflow-go-pack for building self-contained Airflow bundles
    
    Introduce the airflow-go-pack tool (delivered via the go.mod tool
    directive) that builds a Go bundle binary, queries it for its dag/task
    identity, and appends the source plus an airflow-metadata.yaml manifest
    plus an AFBNDL01 trailer into a single self-contained executable bundle
    (ADR 0002/0004).
    
    The bundle binary exposes a single --airflow-metadata flag that prints
    the manifest as JSON conforming to airflow-metadata.schema.json
    (airflow_bundle_metadata_version, sdk.{language,version,
    supervisor_schema_version}, dags). The manifest wire types live once in
    internal/airflowmetadata, shared by the producer (pkg/execution) and the
    packer so the two cannot drift. The footer reader/writer lives in
    internal/bundlefooter and backs the `inspect` subcommand.
    
    * Go-SDK: align airflow-go-pack bundle footer with spec and support 
cross-arch packing
    
    Match the 64-byte AFBNDL01 trailer (with binary_sha256 over the binary
    region) that the executable spec and the Python ExecutableCoordinator
    already require; the Go packer previously wrote an incompatible 32-byte
    trailer that the consumer rejected.
    
    Read the manifest from a host-native build so packing works when the
    deployable artefact is cross-compiled, and recover from an unrunnable
    foreign-arch --executable by building the --source package on the host
    solely to read its metadata. Adds a cross-architecture end-to-end test.
    
    * self-review: Fix nits
    
    * self-review: prevent airflow-go-pack from truncating the input executable
    
    When --output (or the default output derived from the package directory
    name) resolved to the same file as the pre-built --executable, the copy
    step truncated that file before reading it, destroying the binary and
    appending a footer to an empty region.
    
    Resolve the output path before introspection and reject when it aliases
    the executable or source (cleaned-abs comparison plus os.SameFile for
    inode-level links). Assemble the bundle through a temp file and atomically
    rename it into place so a failed pack never leaves a truncated or
    half-written artefact at the output path.
    
    * self-review: harden airflow-go-pack flag validation and output checks
    
    - Reject --executable together with --goos/--goarch: --executable packs
      the binary as-is and never builds, so it cannot cross-compile. Silently
      ignoring the flags contradicted the fail-fast guidance that points
      unrunnable-binary users at the cross-build path.
    - Resolve the source path and reject a directory/colliding --output before
      any go build, so a misconfigured output fails fast instead of after a
      (cross) compile.
    - Fix stale docs: the manifest is YAML-by-default (not "the JSON shape"),
      and the bundle output is a self-contained executable with an appended
      AFBNDL01 trailer, not a ZIP.
    
    * Go-SDK: correct airflow-go-pack default-output-name docs
    
    The default bundle output filename is derived from the main package
    directory name (matching `go build`), resolved before the build so a bad
    `--output` fails fast. It never came from `BundleInfo.Name`, and the
    manifest carries no name field. Fix the stale renderManifest comment and
    ADR 0002 claim that said otherwise.
    
    * Go-SDK: guard airflow-go-pack output against the --airflow-metadata file
    
    rejectOutputAlias already refused an --output that aliased the executable
    or source; extend it to the supplied --airflow-metadata file so
    `--output X --airflow-metadata X` fails fast instead of clobbering the
    manifest when the bundle is renamed into place.
    
    * Go-SDK: add airflow-go-pack inspect command test
    
    Cover the inspect command's formatting and the bundlefooter.Read happy
    path with a table test exercising output with and without --source.
---
 ...0002-use-go-tool-directive-for-bundle-packer.md | 110 ++--
 go-sdk/bundle/bundlev1/bundlev1server/server.go    |  63 +-
 go-sdk/bundle/bundlev1/registry.go                 |  51 +-
 go-sdk/bundle/bundlev1/registry_test.go            |  35 ++
 go-sdk/cmd/airflow-go-pack/inspect.go              |  57 ++
 go-sdk/cmd/airflow-go-pack/inspect_test.go         |  74 +++
 go-sdk/cmd/airflow-go-pack/main.go                 | 146 +++++
 go-sdk/cmd/airflow-go-pack/pack.go                 | 672 +++++++++++++++++++++
 .../cmd/airflow-go-pack/pack_integration_test.go   | 357 +++++++++++
 go-sdk/cmd/airflow-go-pack/pack_test.go            | 506 ++++++++++++++++
 go-sdk/example/bundle/Justfile                     |  25 +-
 go-sdk/go.mod                                      |   4 +-
 go-sdk/internal/airflowmetadata/airflowmetadata.go |  51 ++
 go-sdk/internal/bundlefooter/footer.go             | 280 +++++++++
 go-sdk/internal/bundlefooter/footer_test.go        | 158 +++++
 go-sdk/pkg/execution/messages.go                   |   7 +
 go-sdk/pkg/execution/metadata.go                   | 157 +++++
 go-sdk/pkg/execution/metadata_test.go              | 119 ++++
 18 files changed, 2803 insertions(+), 69 deletions(-)

diff --git a/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md 
b/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
index 08bd89a8b5f..3f5efde805c 100644
--- a/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
+++ b/go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md
@@ -113,35 +113,48 @@ convention so authors can forward arbitrary flags to the 
underlying
       to produce the *target artifact*.
    3. Executes a *host-runnable introspection binary* with
       `--airflow-metadata` to obtain 
`sdk.{language,version,supervisor_schema_version}`
-      and the `dags` mapping. The packer first tries to exec the target
-      artifact directly; if that fails with "exec format error" (the
-      target is built for a different OS/arch than the host), the
-      packer builds a host-arch sidecar from the same package
-      (`go build` with `GOOS`/`GOARCH` unset, written to
-      `<tmpdir>/<binname>.introspect`) and execs that instead. Both
-      binaries are produced from the same source package against the
-      same module graph, so `RegisterDags` records identical dag/task
-      identity regardless of which one is execed.
-   4. Writes a deterministic ZIP next to the working directory at
-      `<bundleName>.zip`, where `<bundleName>` comes from the
-      binary's `BundleInfo.Name` (part of the same `--airflow-metadata`
-      introspection output).
+      and the `dags` mapping. When cross-compiling, the packer builds a
+      host-arch sidecar from the same package and forwarded build flags
+      and execs that instead; both come from the same sources and flags,
+      so `RegisterDags` records identical identity either way. The sidecar
+      is skipped when `--airflow-metadata` supplies the manifest directly
+      (see the overrides below).
+   4. Writes a self-contained bundle: the deployable executable with
+      the source bytes, the manifest bytes, and the fixed 64-byte
+      `AFBNDL01` trailer appended (the format from ADR 0004 /
+      `task-sdk/docs/executable-bundle-spec.rst`). By default it is
+      written next to the working directory under the bundle's main
+      package directory name; `--output` overrides the path.
 
    Optional overrides, for advanced or pre-built workflows:
 
-   - `--source <path>`: override the auto-detected source file.
-   - `--executable <path>`: skip the internal `go build` and pack a
-     pre-built binary. Mutually exclusive with `--` build-flag
-     passthrough. If the supplied binary is not host-runnable (e.g.
-     the user cross-built a `linux/amd64` binary from a `darwin/arm64`
-     host), the packer still needs to introspect it: it builds a
-     host-arch sidecar from the positional package and execs that for
-     `--airflow-metadata`, then appends the resulting footer to the
-     user-supplied binary. If no positional package was passed and
-     the supplied binary is not host-runnable, the packer errors with
-     a message asking for the source package so the sidecar can be
-     built.
-   - `--output <path>`: override the default output ZIP path.
+   - `--source <path>`: override the auto-detected source file, and the
+     escape hatch when discovery can't pick it. It skips discovery
+     entirely (a plain `go list` that ignores build tags and
+     `GOOS`/`GOARCH`, so it can fail or pick the wrong file).
+   - `--executable <path>`: skip `go build` and pack a pre-built binary.
+     Mutually exclusive with `--` build flags and with `--goos`/`--goarch`
+     (it never builds). The binary must run on the host so the packer can
+     exec it for `--airflow-metadata`. A non-host-runnable binary is a hard
+     error: the packer won't rebuild a host sidecar, because the original
+     build inputs (tags, `-ldflags`, `GOOS`/`GOARCH` files) are unknown and
+     a rebuild could advertise a different dag/task set than shipped. To
+     pack for another platform, use the build path (`--goos`/`--goarch`,
+     below) or supply `--airflow-metadata`.
+   - `--airflow-metadata <path>`: supply a captured manifest instead of
+     introspecting the binary. Accepts the binary's YAML default, its
+     `--format json` output, or a bundle's embedded `airflow-metadata.yaml`
+     (one YAML decoder reads all three). Short-circuits introspection in
+     every mode — the deterministic way to pack a pre-built cross binary
+     with `--executable`.
+   - `--goos <os>` / `--goarch <arch>`: cross-compile the deployable bundle.
+     Prefer these over the `GOOS`/`GOARCH` env vars: under `go tool` those
+     env vars cross-build the packer itself, which then can't exec on the
+     host. The flags target only the internal `go build`, leaving the
+     packer build host-native; they fall back to the env vars, then the
+     host. Mutually exclusive with `--executable`.
+   - `--output <path>`: override the default output path. Its parent
+     directory is created if missing.
 
    Examples:
 
@@ -152,14 +165,20 @@ convention so authors can forward arbitrary flags to the 
underlying
    # Pack a different package, with extra go build flags.
    go tool airflow-go-pack ./cmd/my-bundle -- -trimpath -tags=prod
 
-   # Pack an already-built binary (skips go build).
+   # Pack an already-built binary for THIS host (skips go build).
    go tool airflow-go-pack --executable ./build/example --source main.go
+
+   # Pack a bundle for a different platform: cross-build via the build
+   # path (no --executable), forwarding go build flags after "--".
+   # Use --goos/--goarch, NOT the GOOS/GOARCH env vars, under `go tool`.
+   go tool airflow-go-pack --goos linux --goarch amd64 ./cmd/my-bundle -- 
-trimpath
    ```
 
 2. **Extend the existing `--airflow-metadata` flag in
    `bundlev1server.Serve` to print the full spec.** Rather than adding a
    second introspection flag, `--airflow-metadata` is the single flag the
-   packer relies on; it prints a JSON document of the form:
+   packer relies on; it prints a manifest document (YAML by default, JSON
+   under `--format json`) of the form (shown here as JSON for structure):
 
    ```json
    {
@@ -183,9 +202,18 @@ convention so authors can forward arbitrary flags to the 
underlying
    recording registry, then enumerating the recorded dags and their
    tasks. `--airflow-metadata` today prints only `BundleInfo`
    (`server.go`); it is extended to emit this full document, so the
-   shipped `decideMode` switch needs only one metadata mode. The bundle's
-   `BundleInfo.Name` (used by the packer for the default output
-   filename) is carried in the same output.
+   shipped `decideMode` switch needs only one metadata mode. The packer
+   derives the default output filename from the bundle's main package
+   directory name (what `go build` itself names the binary), resolved
+   before the build so a bad `--output` fails fast; it does not come from
+   `BundleInfo.Name`, and no name field is carried in this output.
+
+   A `--format yaml|json` flag selects the encoding and is only valid with
+   `--airflow-metadata` (misuse is a hard error). The default is YAML,
+   matching a bundle's embedded `airflow-metadata.yaml`, so
+   `mybundle --airflow-metadata > airflow-metadata.yaml` is ready to use.
+   The packer never sets `--format`; its YAML decoder reads both the YAML
+   default and `--format json` output (JSON is a subset of YAML).
 
 3. **Bundle authors register the packer in their own `go.mod`:**
 
@@ -251,12 +279,14 @@ convention so authors can forward arbitrary flags to the 
underlying
 - `go build` flag passthrough uses the standard `--` separator
   convention so the packer's own flag set stays small and stable.
 - Host-runnable detection is by attempted exec, not by parsing the
-  binary's exec format. The packer runs the candidate introspection
-  binary with `--airflow-metadata` and treats the OS's "exec format
-  error" (and the Windows equivalent surfaced by `os/exec`) as the
-  signal to fall back to building a host-arch sidecar. Other exec
-  failures (non-zero exit, malformed JSON, missing flag) are real
-  errors and are surfaced to the user as-is. The Go build cache
-  amortises the sidecar to a link step when host arch is already
-  involved, so there is no measurable overhead when no cross-compile
-  is in play.
+  binary's format: the packer runs the candidate with `--airflow-metadata`
+  and treats an "exec format error" (and its Windows equivalent) as
+  not-startable. Other failures (non-zero exit, malformed output, missing
+  flag) are surfaced as-is.
+- A not-startable binary is handled differently per mode. The **build**
+  path owns the build, so it builds a host-arch sidecar from the same
+  package and `--` flags (the build cache amortises this to a link step,
+  so no measurable overhead without a cross-compile). In **`--executable`**
+  mode the build inputs are unknown, so the packer must not synthesise a
+  sidecar that could diverge from the shipped binary: it fails fast and
+  points the user at the cross-build path or `--airflow-metadata`.
diff --git a/go-sdk/bundle/bundlev1/bundlev1server/server.go 
b/go-sdk/bundle/bundlev1/bundlev1server/server.go
index 02036742d09..fde5314d332 100644
--- a/go-sdk/bundle/bundlev1/bundlev1server/server.go
+++ b/go-sdk/bundle/bundlev1/bundlev1server/server.go
@@ -18,9 +18,7 @@
 package bundlev1server
 
 import (
-       "encoding/json"
        "errors"
-       "fmt"
        "log/slog"
        "os"
 
@@ -44,13 +42,28 @@ var ErrCoordinatorFlagsIncomplete = errors.New(
        "--comm and --logs must be supplied together",
 )
 
-// CLI Flags.
-// The --bundle-metadata flag is used for showing the embedded bundle info in 
airflow-metadata.yaml spec format.
-// The --comm and --logs select the coordinator-mode protocol
-// All three are read by Serve to choose a server mode below.
+// ErrFormatRequiresMetadata is returned by [Serve] when --format is supplied
+// without --airflow-metadata, the only mode whose encoding it selects.
+var ErrFormatRequiresMetadata = errors.New(
+       "--format is only valid together with --airflow-metadata",
+)
+
+// CLI Flags, all read by Serve to choose a server mode below.
+// --airflow-metadata prints the bundle's manifest and exits (airflow-go-pack
+// consumes it to build the embedded airflow-metadata.yaml); --format selects
+// its encoding. --comm and --logs select coordinator mode.
 var (
-       versionInfo = flag.Bool("bundle-metadata", false, "show the embedded 
bundle info")
-       commAddr    = flag.String(
+       printMetadata = flag.Bool(
+               "airflow-metadata",
+               false,
+               "print the bundle's airflow-metadata manifest and exit",
+       )
+       metadataFormat = flag.String(
+               "format",
+               string(execution.MetadataFormatYAML),
+               "encoding for --airflow-metadata: yaml (default) or json; only 
valid with --airflow-metadata",
+       )
+       commAddr = flag.String(
                "comm",
                "",
                "host:port of the supervisor's coordinator comm channel 
(selects coordinator mode)",
@@ -83,7 +96,7 @@ type serveMode int
 
 const (
        modePlugin                serveMode = iota // go-plugin gRPC (existing 
Edge Worker path)
-       modeMetadataDump                           // --bundle-metadata: print 
BundleInfo JSON
+       modeAirflowMetadata                        // --airflow-metadata: print 
the manifest JSON (ADR 0002/0004)
        modeCoordinator                            // --comm/--logs: 
msgpack-over-IPC (ADR 0003)
        modeCoordinatorUsageError                  // misuse: print usage and 
exit non-zero
 )
@@ -114,9 +127,21 @@ func Serve(bundle bundlev1.BundleProvider, opts 
...ServeOpt) error {
                c.ApplyServeOpt(serveConfig)
        }
 
-       switch decideMode() {
-       case modeMetadataDump:
-               return dumpBundleMetadata(bundle)
+       mode := decideMode()
+
+       // --format applies only to --airflow-metadata; reject it elsewhere 
instead
+       // of silently ignoring it.
+       if mode != modeAirflowMetadata && flag.CommandLine.Changed("format") {
+               return ErrFormatRequiresMetadata
+       }
+
+       switch mode {
+       case modeAirflowMetadata:
+               format, err := execution.ParseMetadataFormat(*metadataFormat)
+               if err != nil {
+                       return err
+               }
+               return execution.DumpAirflowMetadata(bundle, format)
        case modeCoordinator:
                // In coordinator mode the supervisor reads the logs channel for
                // structured records, so configuring the hclog/stderr default
@@ -133,8 +158,8 @@ func Serve(bundle bundlev1.BundleProvider, opts 
...ServeOpt) error {
 }
 
 func decideMode() serveMode {
-       if *versionInfo {
-               return modeMetadataDump
+       if *printMetadata {
+               return modeAirflowMetadata
        }
        commSet := *commAddr != ""
        logsSet := *logsAddr != ""
@@ -148,16 +173,6 @@ func decideMode() serveMode {
        return modePlugin
 }
 
-func dumpBundleMetadata(bundle bundlev1.BundleProvider) error {
-       meta := bundle.GetBundleVersion()
-       data, err := json.MarshalIndent(meta, "", "    ")
-       if err != nil {
-               return err
-       }
-       fmt.Println(string(data))
-       return nil
-}
-
 func installPluginLogger() {
        hcLogger := hclog.New(&hclog.LoggerOptions{
                Level:                    hclog.Trace,
diff --git a/go-sdk/bundle/bundlev1/registry.go 
b/go-sdk/bundle/bundlev1/registry.go
index 8d902efa081..59cad631125 100644
--- a/go-sdk/bundle/bundlev1/registry.go
+++ b/go-sdk/bundle/bundlev1/registry.go
@@ -43,9 +43,30 @@ type (
                AddDag(dagId string) Dag
        }
 
+       // TaskInfo describes a registered task by its user-visible id.
+       TaskInfo struct {
+               ID string
+       }
+
+       // DagInfo describes a registered dag together with its tasks in
+       // registration order.
+       DagInfo struct {
+               DagID string
+               Tasks []TaskInfo
+       }
+
+       // EnumerableBundle exposes the dag/task identity recorded by 
RegisterDags.
+       // The default registry implements it; airflow-go-pack relies on it to 
read
+       // a bundle's dag/task ids without executing any task.
+       EnumerableBundle interface {
+               OrderedDags() []DagInfo
+       }
+
        registry struct {
                sync.RWMutex
                taskFuncMap map[string]map[string]Task
+               dagOrder    []string
+               taskOrder   map[string][]string
        }
 )
 
@@ -64,7 +85,10 @@ func (d dagShim) AddTaskWithName(taskId string, fn any) {
 
 // Function New creates a new bundle on which Dag and Tasks can be registered
 func New() Registry {
-       return &registry{taskFuncMap: make(map[string]map[string]Task)}
+       return &registry{
+               taskFuncMap: make(map[string]map[string]Task),
+               taskOrder:   make(map[string][]string),
+       }
 }
 
 func getFnName(fn reflect.Value) string {
@@ -76,10 +100,14 @@ func getFnName(fn reflect.Value) string {
 }
 
 func (r *registry) AddDag(dagId string) Dag {
+       r.Lock()
+       defer r.Unlock()
+
        if _, exists := r.taskFuncMap[dagId]; exists {
                panic(fmt.Errorf("Dag %q already exists in bundle", dagId))
        }
        r.taskFuncMap[dagId] = make(map[string]Task)
+       r.dagOrder = append(r.dagOrder, dagId)
        return dagShim{dagId, r}
 }
 
@@ -109,6 +137,7 @@ func (r *registry) registerTaskWithName(dagId, taskId 
string, fn any) {
        if !exists {
                dagTasks = make(map[string]Task)
                r.taskFuncMap[dagId] = dagTasks
+               r.dagOrder = append(r.dagOrder, dagId)
        }
 
        _, exists = dagTasks[taskId]
@@ -116,6 +145,7 @@ func (r *registry) registerTaskWithName(dagId, taskId 
string, fn any) {
                panic(fmt.Errorf("taskId %q is already registered for DAG %q", 
taskId, dagId))
        }
        dagTasks[taskId] = task
+       r.taskOrder[dagId] = append(r.taskOrder[dagId], taskId)
 }
 
 func (r *registry) LookupTask(dagId, taskId string) (task Task, exists bool) {
@@ -129,3 +159,22 @@ func (r *registry) LookupTask(dagId, taskId string) (task 
Task, exists bool) {
        task, exists = dagTasks[taskId]
        return task, exists
 }
+
+// OrderedDags returns the registered dags in AddDag order, each with its tasks
+// in registration order. The returned slice is freshly allocated; callers may
+// mutate it freely.
+func (r *registry) OrderedDags() []DagInfo {
+       r.RLock()
+       defer r.RUnlock()
+
+       out := make([]DagInfo, 0, len(r.dagOrder))
+       for _, dagID := range r.dagOrder {
+               taskIDs := r.taskOrder[dagID]
+               tasks := make([]TaskInfo, 0, len(taskIDs))
+               for _, tid := range taskIDs {
+                       tasks = append(tasks, TaskInfo{ID: tid})
+               }
+               out = append(out, DagInfo{DagID: dagID, Tasks: tasks})
+       }
+       return out
+}
diff --git a/go-sdk/bundle/bundlev1/registry_test.go 
b/go-sdk/bundle/bundlev1/registry_test.go
index 25105cd1455..f405c8f5315 100644
--- a/go-sdk/bundle/bundlev1/registry_test.go
+++ b/go-sdk/bundle/bundlev1/registry_test.go
@@ -118,3 +118,38 @@ func (s *RegistrySuite) TestAddTask_ErrorReturnType() {
        _, exists := s.reg.LookupTask("dag1", "errorTask")
        s.True(exists)
 }
+
+func (s *RegistrySuite) TestOrderedDags_Empty() {
+       enum, ok := New().(EnumerableBundle)
+       s.Require().True(ok)
+       s.Empty(enum.OrderedDags())
+}
+
+func (s *RegistrySuite) TestOrderedDags_PreservesRegistrationOrder() {
+       reg := New()
+       // Register dags out of alphabetical order so the test fails if 
OrderedDags
+       // ever sorts instead of preserving AddDag order.
+       zeta := reg.AddDag("zeta")
+       zeta.AddTaskWithName("z1", myTask)
+       zeta.AddTaskWithName("z2", myTask)
+
+       alpha := reg.AddDag("alpha")
+       alpha.AddTaskWithName("a1", myTask)
+
+       reg.AddDag("mid") // dag with no tasks
+
+       enum, ok := reg.(EnumerableBundle)
+       s.Require().True(ok)
+
+       got := enum.OrderedDags()
+       s.Require().Len(got, 3)
+
+       s.Equal("zeta", got[0].DagID)
+       s.Equal([]TaskInfo{{ID: "z1"}, {ID: "z2"}}, got[0].Tasks)
+
+       s.Equal("alpha", got[1].DagID)
+       s.Equal([]TaskInfo{{ID: "a1"}}, got[1].Tasks)
+
+       s.Equal("mid", got[2].DagID)
+       s.Empty(got[2].Tasks)
+}
diff --git a/go-sdk/cmd/airflow-go-pack/inspect.go 
b/go-sdk/cmd/airflow-go-pack/inspect.go
new file mode 100644
index 00000000000..f9cca6d488b
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/inspect.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "fmt"
+
+       "github.com/spf13/cobra"
+
+       "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+func newInspectCmd() *cobra.Command {
+       var showSource bool
+       cmd := &cobra.Command{
+               Use:   "inspect <bundle>",
+               Short: "Print the manifest (and optionally source) embedded in 
a bundle",
+               Args:  cobra.ExactArgs(1),
+               RunE: func(cmd *cobra.Command, args []string) error {
+                       source, manifest, err := bundlefooter.Read(args[0])
+                       if err != nil {
+                               return err
+                       }
+                       out := cmd.OutOrStdout()
+                       if showSource {
+                               fmt.Fprintln(out, "# --- source ---")
+                               out.Write(source)
+                               if len(source) > 0 && source[len(source)-1] != 
'\n' {
+                                       fmt.Fprintln(out)
+                               }
+                               fmt.Fprintln(out, "# --- manifest ---")
+                       }
+                       out.Write(manifest)
+                       if len(manifest) > 0 && manifest[len(manifest)-1] != 
'\n' {
+                               fmt.Fprintln(out)
+                       }
+                       return nil
+               },
+       }
+       cmd.Flags().BoolVar(&showSource, "source", false, "also print the 
embedded source file")
+       return cmd
+}
diff --git a/go-sdk/cmd/airflow-go-pack/inspect_test.go 
b/go-sdk/cmd/airflow-go-pack/inspect_test.go
new file mode 100644
index 00000000000..8208136e801
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/inspect_test.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "bytes"
+       "os"
+       "path/filepath"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// inspect reads a bundle through bundlefooter.Read and prints the embedded
+// manifest, prefixing the source too under --source.
+func TestInspectCmd(t *testing.T) {
+       dir := t.TempDir()
+       exe := filepath.Join(dir, "input-bin")
+       require.NoError(t, os.WriteFile(exe, []byte("binary-bytes"), 0o755))
+       source := []byte("package main\n\nfunc main() {}\n")
+       manifest := []byte(
+               "airflow_bundle_metadata_version: \"1.0\"\n" +
+                       "dags:\n" +
+                       "  my_dag:\n" +
+                       "    tasks:\n" +
+                       "      - \"t1\"\n",
+       )
+       bundle := filepath.Join(dir, "bundle")
+       require.NoError(t, writeBundle(exe, bundle, source, manifest))
+
+       for _, tc := range []struct {
+               name   string
+               args   []string
+               expect string
+       }{
+               {
+                       name:   "manifest only",
+                       args:   []string{bundle},
+                       expect: string(manifest),
+               },
+               {
+                       name: "with source",
+                       args: []string{"--source", bundle},
+                       expect: "# --- source ---\n" + string(source) +
+                               "# --- manifest ---\n" + string(manifest),
+               },
+       } {
+               t.Run(tc.name, func(t *testing.T) {
+                       cmd := newInspectCmd()
+                       var out bytes.Buffer
+                       cmd.SetOut(&out)
+                       cmd.SetErr(&out)
+                       cmd.SetArgs(tc.args)
+                       require.NoError(t, cmd.Execute())
+                       assert.Equal(t, tc.expect, out.String())
+               })
+       }
+}
diff --git a/go-sdk/cmd/airflow-go-pack/main.go 
b/go-sdk/cmd/airflow-go-pack/main.go
new file mode 100644
index 00000000000..cc04b315ac4
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/main.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Command airflow-go-pack builds a self-contained Airflow bundle from a Go
+// package. It runs `go build`, exec's the freshly built binary with
+// `--airflow-metadata` to obtain the manifest, and appends the source plus
+// manifest plus AFBNDL01 trailer to the executable as specified by ADR 0004.
+//
+// Usage:
+//
+//     go tool airflow-go-pack [./path/to/pkg] [-- <go build flags>...]
+//     go tool airflow-go-pack --executable ./build/example --source main.go
+//     go tool airflow-go-pack inspect ./mybundle
+//
+// See go-sdk/adr/0002-use-go-tool-directive-for-bundle-packer.md and
+// go-sdk/adr/0004-self-contained-executable-bundle.md.
+package main
+
+import (
+       "fmt"
+       "os"
+
+       "github.com/spf13/cobra"
+)
+
+func main() {
+       if err := newRootCmd().Execute(); err != nil {
+               fmt.Fprintln(os.Stderr, "error:", err)
+               os.Exit(1)
+       }
+}
+
+func newRootCmd() *cobra.Command {
+       opts := &packOptions{}
+
+       root := &cobra.Command{
+               Use:   "airflow-go-pack [package]",
+               Short: "Build a self-contained Airflow bundle from a Go 
package",
+               Long: `airflow-go-pack builds a Go bundle binary, queries it 
for its DAG/task
+identity via --airflow-metadata, and appends the source plus an
+airflow-metadata.yaml manifest plus an AFBNDL01 trailer to the
+executable. The result is a single self-contained file that drops into
+[executable] bundles_folder.
+
+By default the packer builds the package in the current directory. Pass
+a different package as the positional argument; pass extra go build
+flags after a "--" separator.
+
+--executable expects a binary that runs on this host (same OS/arch). To
+build a bundle for a different platform you have two options: run the
+packer with --goos/--goarch so it cross-builds the deployable artefact
+while building a host-arch binary (forwarding your -- build flags) solely
+to read the manifest; or pack a pre-built cross binary with --executable
+and supply its manifest via --airflow-metadata, captured by running the
+binary on its native platform (mybundle --airflow-metadata > meta.yaml).
+
+Use --goos/--goarch rather than the GOOS/GOARCH env vars: under
+"go tool airflow-go-pack" those env vars cross-build the packer itself,
+which then cannot exec on the host.
+
+Examples:
+  go tool airflow-go-pack
+  go tool airflow-go-pack ./cmd/my-bundle -- -trimpath -tags=prod
+  go tool airflow-go-pack --executable ./build/example --source main.go
+
+  # Cross-platform via the build path: cross-build + host introspection.
+  go tool airflow-go-pack --goos linux --goarch amd64 ./cmd/my-bundle -- 
-trimpath
+
+  # Cross-platform via a pre-built binary: pack it with its captured manifest.
+  GOOS=linux GOARCH=arm64 go build -o ./build/example-arm64 ./cmd/my-bundle
+  go build -o ./build/example-on-native-host ./cmd/my-bundle
+  ./build/example-on-native-host --airflow-metadata > meta.yaml
+  go tool airflow-go-pack --executable ./build/example-arm64 \
+    --source ./cmd/my-bundle/main.go --airflow-metadata meta.yaml
+`,
+               // Only count args BEFORE "--" toward the positional limit; args
+               // after "--" are forwarded verbatim to `go build` and must not
+               // inflate the count (e.g. `-- -ldflags "-X main.foo=bar"`).
+               Args: func(cmd *cobra.Command, args []string) error {
+                       dashAt := cmd.ArgsLenAtDash()
+                       pkgArgs := args
+                       if dashAt >= 0 {
+                               pkgArgs = args[:dashAt]
+                       }
+                       return cobra.MaximumNArgs(1)(cmd, pkgArgs)
+               },
+               RunE: func(cmd *cobra.Command, args []string) error {
+                       // Anything after "--" is forwarded to the internal `go 
build`
+                       // invocation. ArgsLenAtDash() returns the count of 
args before
+                       // the dash, or -1 if the dash isn't present.
+                       dashAt := cmd.ArgsLenAtDash()
+                       var pkgArgs, buildArgs []string
+                       if dashAt < 0 {
+                               pkgArgs = args
+                       } else {
+                               pkgArgs = args[:dashAt]
+                               buildArgs = args[dashAt:]
+                       }
+                       opts.pkg = "."
+                       if len(pkgArgs) == 1 {
+                               opts.pkg = pkgArgs[0]
+                       }
+                       opts.buildArgs = buildArgs
+                       return runPack(cmd.OutOrStdout(), cmd.ErrOrStderr(), 
opts)
+               },
+       }
+
+       root.Flags().StringVar(&opts.source, "source",
+               "",
+               "path to the DAG source file (defaults to the file in the 
target package containing func main)")
+       root.Flags().StringVar(&opts.executable, "executable",
+               "",
+               "pack a pre-built executable instead of running go build")
+       root.Flags().StringVar(&opts.output, "output",
+               "",
+               "output bundle path (defaults to ./<package-dir-name>)")
+       root.Flags().StringVar(&opts.airflowMetadata, "airflow-metadata",
+               "",
+               "path to a pre-captured --airflow-metadata manifest (JSON or 
YAML); skips "+
+                       "introspecting the binary")
+       root.Flags().StringVar(&opts.goos, "goos",
+               "",
+               "target GOOS for the bundle (cross-compile); prefer this over 
the GOOS env "+
+                       "var, which `go tool` would use to cross-build the 
packer itself")
+       root.Flags().StringVar(&opts.goarch, "goarch",
+               "",
+               "target GOARCH for the bundle (cross-compile); prefer this over 
the GOARCH env "+
+                       "var, which `go tool` would use to cross-build the 
packer itself")
+
+       root.AddCommand(newInspectCmd())
+       return root
+}
diff --git a/go-sdk/cmd/airflow-go-pack/pack.go 
b/go-sdk/cmd/airflow-go-pack/pack.go
new file mode 100644
index 00000000000..8441b23569b
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/pack.go
@@ -0,0 +1,672 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "go/ast"
+       "go/parser"
+       "go/token"
+       "io"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "runtime"
+       "sort"
+       "strings"
+
+       "gopkg.in/yaml.v3"
+
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+       "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+// packOptions are the flags accepted by the root pack command.
+type packOptions struct {
+       pkg             string   // target package (default ".")
+       source          string   // override the auto-detected DAG source file
+       executable      string   // pack a pre-built binary instead of building
+       output          string   // override the default <bundleName> output 
path
+       airflowMetadata string   // path to a pre-captured --airflow-metadata 
manifest (JSON or YAML)
+       goos            string   // target GOOS for the deployable build (falls 
back to env GOOS, then host)
+       goarch          string   // target GOARCH for the deployable build 
(falls back to env GOARCH, then host)
+       buildArgs       []string // forwarded verbatim to `go build` (already 
includes the leading "--")
+}
+
+func runPack(stdout, stderr io.Writer, opts *packOptions) error {
+       if opts.executable != "" && len(opts.buildArgs) > 0 {
+               return fmt.Errorf("--executable is mutually exclusive with go 
build flags after \"--\"")
+       }
+       if opts.executable != "" && (opts.goos != "" || opts.goarch != "") {
+               return fmt.Errorf(
+                       "--executable is mutually exclusive with 
--goos/--goarch: --executable packs the " +
+                               "binary as-is and never builds, so it cannot 
cross-compile. To cross-build a " +
+                               "bundle, drop --executable and pass 
--goos/--goarch with the package path",
+               )
+       }
+
+       // Resolve the DAG source file for both modes up front. --executable 
requires
+       // it explicitly; the build path falls back to discovery.
+       sourcePath := opts.source
+       if opts.executable != "" {
+               if sourcePath == "" {
+                       return fmt.Errorf(
+                               "--executable requires --source: cannot infer 
the DAG source for a pre-built binary",
+                       )
+               }
+       } else if sourcePath == "" {
+               // --source is the documented escape hatch for packages whose 
main file
+               // cannot be auto-detected: it may be selected by build tags or 
GOOS, or
+               // the package may have several files with func main(). 
Discovery runs a
+               // plain `go list` without the forwarded build flags, so it can 
fail or
+               // pick the wrong file for such packages. Only fall back to 
discovery
+               // when --source was not supplied, so an explicit --source 
always wins.
+               discovered, err := discoverMainSource(opts.pkg)
+               if err != nil {
+                       return fmt.Errorf("locating DAG source file: %w", err)
+               }
+               sourcePath = discovered
+       }
+       if _, err := os.Stat(sourcePath); err != nil {
+               return fmt.Errorf("source file %s: %w", sourcePath, err)
+       }
+
+       output := opts.output
+       if output == "" {
+               defaultPath, err := defaultOutputPath(sourcePath)
+               if err != nil {
+                       return fmt.Errorf("determining default output path: 
%w", err)
+               }
+               output = defaultPath
+       }
+
+       // The bundle is finalised by renaming a temp file onto output, which 
fails
+       // if output is an existing directory. Catch that here: the default 
output is
+       // the package directory's name, so packing ./foo from a dir that 
already has
+       // a ./foo directory collides. Report it with the fix instead of a bare
+       // "rename ...: file exists" from os.Rename. Done before any build so a
+       // misconfigured output fails fast rather than after a (cross) go build.
+       if info, err := os.Stat(output); err == nil && info.IsDir() {
+               return fmt.Errorf(
+                       "output path %q is an existing directory (the bundle 
output defaults to the "+
+                               "package directory's name); pass --output to 
write the bundle to a file path",
+                       output,
+               )
+       }
+
+       // execPath is the binary that receives the footer (the deployable 
artefact,
+       // which MAY be cross-compiled). introspectPath is the binary 
obtainMetadata
+       // reads --airflow-metadata from. By default that means exec'ing it on 
the
+       // host (so it must be host-runnable, hence the cross-compile sidecar 
below),
+       // but --airflow-metadata bypasses it entirely.
+       var execPath, introspectPath string
+       cleanupExec := func() {}
+       defer func() { cleanupExec() }()
+
+       if opts.executable != "" {
+               execPath = opts.executable
+               introspectPath = opts.executable
+       } else {
+               targetGOOS, targetGOARCH := targetPlatform(opts)
+               artifact, cleanup, err := buildPackage(stderr, opts.pkg, 
opts.buildArgs, targetGOOS, targetGOARCH)
+               if err != nil {
+                       return err
+               }
+               execPath = artifact
+               cleanupExec = cleanup
+               introspectPath = artifact
+
+               // Reading the manifest means exec'ing the binary, so it must 
be a
+               // host-native build. When cross-compiling, the artefact cannot 
run
+               // here; build a throwaway host binary from the same sources 
and the
+               // same forwarded `--` build flags (DAG/task identity is 
arch-independent)
+               // solely to introspect. This sidecar is unnecessary when
+               // --airflow-metadata supplies the manifest directly.
+               crossCompiling := targetGOOS != runtime.GOOS || targetGOARCH != 
runtime.GOARCH
+               if crossCompiling && opts.airflowMetadata == "" {
+                       hostBin, cleanupHost, err := buildPackage(stderr, 
opts.pkg, opts.buildArgs, runtime.GOOS, runtime.GOARCH)
+                       if err != nil {
+                               return fmt.Errorf("building host binary for 
metadata introspection: %w", err)
+                       }
+                       prevCleanup := cleanupExec
+                       cleanupExec = func() { cleanupHost(); prevCleanup() }
+                       introspectPath = hostBin
+               }
+       }
+
+       if _, err := os.Stat(execPath); err != nil {
+               return fmt.Errorf("executable %s: %w", execPath, err)
+       }
+
+       if err := rejectOutputAlias(output, execPath, sourcePath, 
opts.airflowMetadata); err != nil {
+               return err
+       }
+
+       meta, err := obtainMetadata(opts, introspectPath)
+       if err != nil {
+               return err
+       }
+       if len(meta.Dags) == 0 {
+               return fmt.Errorf("bundle exposes no dags: nothing to pack")
+       }
+       for dagID, dag := range meta.Dags {
+               if len(dag.Tasks) == 0 {
+                       fmt.Fprintf(stderr, "warning: dag %q has no tasks\n", 
dagID)
+               }
+       }
+
+       manifest, err := renderManifest(meta, filepath.Base(sourcePath))
+       if err != nil {
+               return fmt.Errorf("rendering manifest: %w", err)
+       }
+       sourceBytes, err := os.ReadFile(sourcePath)
+       if err != nil {
+               return fmt.Errorf("reading source file: %w", err)
+       }
+
+       // Assemble the bundle through a temp file and atomically move it into
+       // place: we never mutate the build artefact or the user-supplied
+       // --executable, and a failed pack never leaves a truncated or 
half-written
+       // file at output.
+       if err := writeBundle(execPath, output, sourceBytes, manifest); err != 
nil {
+               return err
+       }
+
+       fmt.Fprintf(stdout, "Wrote bundle %s (sdk=%s/%s, dags=%d)\n",
+               output, meta.SDK.Language, meta.SDK.Version, len(meta.Dags))
+       return nil
+}
+
+// defaultOutputPath derives the default bundle output path from the directory
+// that owns the DAG source file. That directory is the bundle's main package,
+// so its base name is what `go build` itself would name the binary. On Windows
+// the .exe suffix is appended.
+func defaultOutputPath(sourcePath string) (string, error) {
+       abs, err := filepath.Abs(sourcePath)
+       if err != nil {
+               return "", err
+       }
+       name := filepath.Base(filepath.Dir(abs))
+       if runtime.GOOS == "windows" {
+               name += ".exe"
+       }
+       return name, nil
+}
+
+// discoverMainSource locates the file in the given package whose AST contains
+// a top-level `func main()`. Returns an error if the package has zero or
+// more than one such file, mirroring ADR 0002's discovery contract.
+func discoverMainSource(pkg string) (string, error) {
+       cmd := exec.Command("go", "list", "-f", "{{.Dir}}\n{{range 
.GoFiles}}{{.}}\n{{end}}", pkg)
+       var stdout, stderr bytes.Buffer
+       cmd.Stdout = &stdout
+       cmd.Stderr = &stderr
+       if err := cmd.Run(); err != nil {
+               return "", fmt.Errorf(
+                       "go list %s: %w: %s\n"+
+                               "airflow-go-pack packs the Go package in the 
current directory by default; "+
+                               "pass your bundle's package path (e.g. 
`airflow-go-pack ./path/to/bundle`) "+
+                               "or --source to point at the DAG source file 
directly",
+                       pkg, err, strings.TrimSpace(stderr.String()),
+               )
+       }
+
+       lines := splitNonEmpty(stdout.String())
+       if len(lines) < 2 {
+               return "", fmt.Errorf("package %s has no Go source files", pkg)
+       }
+       dir := lines[0]
+       files := lines[1:]
+
+       fset := token.NewFileSet()
+       var matches []string
+       for _, name := range files {
+               full := filepath.Join(dir, name)
+               f, err := parser.ParseFile(fset, full, nil, 
parser.SkipObjectResolution)
+               if err != nil {
+                       return "", fmt.Errorf("parsing %s: %w", full, err)
+               }
+               if hasMainFunc(f) {
+                       matches = append(matches, full)
+               }
+       }
+       switch len(matches) {
+       case 0:
+               return "", fmt.Errorf("no file in package %s defines func 
main()", pkg)
+       case 1:
+               return matches[0], nil
+       default:
+               return "", fmt.Errorf(
+                       "multiple files in package %s define func main(): %v; 
use --source to disambiguate",
+                       pkg,
+                       matches,
+               )
+       }
+}
+
+func hasMainFunc(f *ast.File) bool {
+       for _, decl := range f.Decls {
+               fn, ok := decl.(*ast.FuncDecl)
+               if !ok {
+                       continue
+               }
+               if fn.Recv != nil {
+                       continue
+               }
+               if fn.Name.Name != "main" {
+                       continue
+               }
+               if fn.Type.Params != nil && len(fn.Type.Params.List) != 0 {
+                       continue
+               }
+               return true
+       }
+       return false
+}
+
+func splitNonEmpty(s string) []string {
+       var out []string
+       for line := range strings.SplitSeq(s, "\n") {
+               if t := strings.TrimSpace(line); t != "" {
+                       out = append(out, t)
+               }
+       }
+       return out
+}
+
+// targetPlatform resolves the GOOS/GOARCH the deployable bundle is built for:
+// the --goos/--goarch flags win, then the ambient GOOS/GOARCH env, then the
+// host. The flags exist because `go tool airflow-go-pack` builds the packer
+// using the ambient GOOS/GOARCH — setting those in the env to cross-compile a
+// bundle would instead cross-build the packer itself and fail to exec it on 
the
+// host. Passing the target via flags keeps the env (and the packer build)
+// host-native while still cross-building the bundle.
+func targetPlatform(opts *packOptions) (goos, goarch string) {
+       goos = runtime.GOOS
+       if env := os.Getenv("GOOS"); env != "" {
+               goos = env
+       }
+       if opts.goos != "" {
+               goos = opts.goos
+       }
+       goarch = runtime.GOARCH
+       if env := os.Getenv("GOARCH"); env != "" {
+               goarch = env
+       }
+       if opts.goarch != "" {
+               goarch = opts.goarch
+       }
+       return goos, goarch
+}
+
+// buildPackage runs `go build [extraArgs...] -o <tmp>/bundle <pkg>` for the
+// given GOOS/GOARCH and returns the path to the freshly built executable plus 
a
+// cleanup function. extraArgs is the slice that comes after the "--" separator
+// on the airflow-go-pack command line; we drop the leading "--" before
+// forwarding. GOOS/GOARCH are set explicitly (overriding any ambient env) so
+// the caller controls the target: the deployable build uses the resolved 
target
+// platform, the introspection sidecar uses the host.
+func buildPackage(
+       stderr io.Writer,
+       pkg string,
+       extraArgs []string,
+       goos, goarch string,
+) (string, func(), error) {
+       tmpDir, err := os.MkdirTemp("", "airflow-go-pack-*")
+       if err != nil {
+               return "", nil, fmt.Errorf("creating temp dir: %w", err)
+       }
+       cleanup := func() { _ = os.RemoveAll(tmpDir) }
+
+       binName := "bundle"
+       if goos == "windows" {
+               binName += ".exe"
+       }
+       outPath := filepath.Join(tmpDir, binName)
+
+       args := []string{"build"}
+       for _, a := range extraArgs {
+               if a == "--" {
+                       continue
+               }
+               args = append(args, a)
+       }
+       args = append(args, "-o", outPath, pkg)
+
+       cmd := exec.Command("go", args...)
+       // Later duplicate keys win in os/exec, so these override any ambient
+       // GOOS/GOARCH (which `go tool` already used to build this packer).
+       cmd.Env = append(os.Environ(), "GOOS="+goos, "GOARCH="+goarch)
+       cmd.Stdout = stderr
+       cmd.Stderr = stderr
+       if err := cmd.Run(); err != nil {
+               cleanup()
+               return "", nil, fmt.Errorf("go build failed: %w", err)
+       }
+       return outPath, cleanup, nil
+}
+
+func readAirflowMetadata(execPath string) (airflowmetadata.Manifest, error) {
+       out, err := runIntrospect(execPath, "--airflow-metadata")
+       if err != nil {
+               return airflowmetadata.Manifest{}, err
+       }
+       // Decode with a YAML decoder: it reads the binary's YAML default and 
its
+       // --format json output alike (JSON is a subset of YAML).
+       var meta airflowmetadata.Manifest
+       if err := yaml.Unmarshal(out, &meta); err != nil {
+               return airflowmetadata.Manifest{}, fmt.Errorf(
+                       "decoding --airflow-metadata output (YAML/JSON): %w",
+                       err,
+               )
+       }
+       return meta, nil
+}
+
+// obtainMetadata resolves the bundle manifest either from an explicit
+// --airflow-metadata file or by exec'ing a host-runnable introspection binary.
+// In --executable mode a binary that cannot be exec'd on the host is a hard
+// error with remediation guidance: --executable expects a same-platform 
binary,
+// and the packer never silently rebuilds a host binary, because a rebuild from
+// unknown inputs (the original build tags, ldflags, and GOOS/GOARCH-specific
+// files are not known here, and build flags are rejected in --executable mode)
+// can advertise a different DAG/task set than the artefact actually shipped.
+func obtainMetadata(opts *packOptions, introspectPath string) 
(airflowmetadata.Manifest, error) {
+       if opts.airflowMetadata != "" {
+               meta, err := readMetadataFile(opts.airflowMetadata)
+               if err != nil {
+                       return airflowmetadata.Manifest{}, fmt.Errorf(
+                               "--airflow-metadata %s: %w",
+                               opts.airflowMetadata,
+                               err,
+                       )
+               }
+               return meta, nil
+       }
+
+       meta, err := readAirflowMetadata(introspectPath)
+       if err == nil {
+               return meta, nil
+       }
+       if opts.executable != "" && errors.Is(err, errExecNotStartable) {
+               return airflowmetadata.Manifest{}, fmt.Errorf(
+                       "cannot exec --executable %q on %s/%s to read its 
--airflow-metadata: %w\n"+
+                               "--executable expects a binary that runs on 
this host. To pack a binary for a\n"+
+                               "different platform, drop --executable and let 
the packer cross-build instead:\n"+
+                               "    airflow-go-pack --goos <os> --goarch 
<arch> ./path/to/pkg [-- <go build flags>]\n"+
+                               "(the packer builds a host-arch binary, 
forwarding your -- build flags, solely to\n"+
+                               "read the manifest). Alternatively pass 
--airflow-metadata with the manifest captured\n"+
+                               "from the binary on its native platform: %s 
--airflow-metadata > airflow-metadata.yaml",
+                       opts.executable, runtime.GOOS, runtime.GOARCH, err, 
opts.executable,
+               )
+       }
+       return airflowmetadata.Manifest{}, fmt.Errorf("--airflow-metadata: %w", 
err)
+}
+
+// readMetadataFile parses a manifest from a pre-captured --airflow-metadata
+// file. It accepts both the JSON a bundle binary prints (via
+// `mybundle --airflow-metadata`) and the airflow-metadata.yaml embedded in an
+// existing bundle: YAML is a superset of JSON, so a YAML decoder reads either.
+func readMetadataFile(path string) (airflowmetadata.Manifest, error) {
+       data, err := os.ReadFile(path)
+       if err != nil {
+               return airflowmetadata.Manifest{}, err
+       }
+       var meta airflowmetadata.Manifest
+       if err := yaml.Unmarshal(data, &meta); err != nil {
+               return airflowmetadata.Manifest{}, fmt.Errorf("decoding 
metadata (YAML/JSON): %w", err)
+       }
+       return meta, nil
+}
+
+// errExecNotStartable marks an introspection failure where the process never
+// ran — typically the binary was built for a different CPU arch / OS, so the
+// OS rejected the exec (e.g. "exec format error", "bad CPU type"). It is
+// distinct from the binary running and exiting non-zero (an *exec.ExitError),
+// which signals a genuine --airflow-metadata failure rather than an
+// unrunnable binary.
+var errExecNotStartable = errors.New("introspection binary could not be 
exec'd")
+
+func runIntrospect(execPath string, flag string) ([]byte, error) {
+       cmd := exec.Command(execPath, flag)
+       var stdout, stderr bytes.Buffer
+       cmd.Stdout = &stdout
+       cmd.Stderr = &stderr
+       if err := cmd.Run(); err != nil {
+               var exitErr *exec.ExitError
+               if !errors.As(err, &exitErr) {
+                       // The process did not start (no exit status). Wrap 
with the
+                       // sentinel so callers can decide whether to fall back 
to a build.
+                       return nil, fmt.Errorf("%w: %s %s: %v", 
errExecNotStartable, execPath, flag, err)
+               }
+               return nil, fmt.Errorf("%s %s: %w: %s", execPath, flag, err, 
stderr.String())
+       }
+       return stdout.Bytes(), nil
+}
+
+// renderManifest serialises the airflow-metadata manifest as deterministic,
+// sorted-key YAML matching airflow-metadata.schema.json. It injects the 
schema's
+// source field (the filename the manifest is built from), which the producer's
+// Manifest omits because only the packer knows it; every other field is copied
+// from the introspected manifest verbatim.
+func renderManifest(meta airflowmetadata.Manifest, sourceName string) ([]byte, 
error) {
+       version := meta.AirflowBundleMetadataVersion
+       if version == "" {
+               version = airflowmetadata.FormatVersion
+       }
+
+       dagIDs := make([]string, 0, len(meta.Dags))
+       for id := range meta.Dags {
+               dagIDs = append(dagIDs, id)
+       }
+       sort.Strings(dagIDs)
+
+       dagsNode := &yaml.Node{Kind: yaml.MappingNode}
+       for _, id := range dagIDs {
+               tasks := meta.Dags[id].Tasks
+               taskItems := make([]*yaml.Node, 0, len(tasks))
+               for _, t := range tasks {
+                       taskItems = append(taskItems, quotedScalar(t))
+               }
+               dagsNode.Content = append(dagsNode.Content,
+                       scalar(id),
+                       &yaml.Node{
+                               Kind: yaml.MappingNode,
+                               Content: []*yaml.Node{
+                                       scalar("tasks"),
+                                       {Kind: yaml.SequenceNode, Content: 
taskItems},
+                               },
+                       },
+               )
+       }
+
+       root := &yaml.Node{Kind: yaml.DocumentNode}
+       manifest := &yaml.Node{
+               Kind: yaml.MappingNode,
+               Content: []*yaml.Node{
+                       scalar("airflow_bundle_metadata_version"), 
quotedScalar(version),
+                       scalar("sdk"),
+                       {
+                               Kind: yaml.MappingNode,
+                               Content: []*yaml.Node{
+                                       scalar("language"), 
quotedScalar(meta.SDK.Language),
+                                       scalar("version"), 
quotedScalar(meta.SDK.Version),
+                                       scalar("supervisor_schema_version"),
+                                       
quotedScalar(meta.SDK.SupervisorSchemaVersion),
+                               },
+                       },
+                       scalar("source"), quotedScalar(sourceName),
+                       scalar("dags"), dagsNode,
+               },
+       }
+       root.Content = []*yaml.Node{manifest}
+
+       var buf bytes.Buffer
+       enc := yaml.NewEncoder(&buf)
+       enc.SetIndent(2)
+       if err := enc.Encode(root); err != nil {
+               return nil, err
+       }
+       if err := enc.Close(); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
+// scalar emits a plain (unquoted) node. It is used for structural keys
+// (e.g. "sdk", "tasks") and for the Dag ID mapping keys.
+func scalar(value string) *yaml.Node {
+       return &yaml.Node{Kind: yaml.ScalarNode, Value: value}
+}
+
+// quotedScalar emits a double-quoted node. Data-bearing string *values* — task
+// IDs, the source filename, and the SDK fields — go through this so a value
+// that looks like a number, bool, or date (e.g. a task named "123" or "true")
+// round-trips as a string rather than being retyped by the YAML parser.
+func quotedScalar(value string) *yaml.Node {
+       return &yaml.Node{Kind: yaml.ScalarNode, Value: value, Style: 
yaml.DoubleQuotedStyle}
+}
+
+// rejectOutputAlias fails if output resolves to the same file as any pack
+// input: the executable, the source, or a supplied --airflow-metadata file.
+// Packing copies the executable to output with O_TRUNC and renames it into
+// place, so an aliased output would clobber the input. metadataPath is empty
+// when --airflow-metadata is not used and is skipped in that case.
+func rejectOutputAlias(output, execPath, sourcePath, metadataPath string) 
error {
+       for _, in := range []struct {
+               path string
+               kind string
+       }{
+               {execPath, "executable"},
+               {sourcePath, "source"},
+               {metadataPath, "--airflow-metadata file"},
+       } {
+               if in.path == "" {
+                       continue
+               }
+               alias, err := sameFile(output, in.path)
+               if err != nil {
+                       return fmt.Errorf("resolving output path %s: %w", 
output, err)
+               }
+               if alias {
+                       return fmt.Errorf(
+                               "output path %s is the same file as the %s %s; 
pass --output to write the bundle elsewhere",
+                               output,
+                               in.kind,
+                               in.path,
+                       )
+               }
+       }
+       return nil
+}
+
+// sameFile reports whether a and b refer to the same file. It first compares
+// cleaned absolute paths (catching e.g. "./bundle" vs "bundle" before either
+// exists) and then, when both paths exist, falls back to os.SameFile to catch
+// links that share an inode.
+func sameFile(a, b string) (bool, error) {
+       absA, err := filepath.Abs(a)
+       if err != nil {
+               return false, err
+       }
+       absB, err := filepath.Abs(b)
+       if err != nil {
+               return false, err
+       }
+       if absA == absB {
+               return true, nil
+       }
+       fiA, err := os.Stat(absA)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return false, nil
+               }
+               return false, err
+       }
+       fiB, err := os.Stat(absB)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return false, nil
+               }
+               return false, err
+       }
+       return os.SameFile(fiA, fiB), nil
+}
+
+// writeBundle assembles the bundle at output by copying the executable to a
+// temporary file in output's directory, appending the source+manifest footer
+// to that copy, then atomically renaming it into place. Writing through a
+// temp file keeps a failed pack from leaving a truncated or half-written
+// artefact at output, and guarantees the file being copied is never the same
+// open file as the destination.
+func writeBundle(execPath, output string, source, metadata []byte) error {
+       outDir := filepath.Dir(output)
+       // The temp file and the atomic rename both live in output's directory, 
so it
+       // must exist. Create it for the user (e.g. --output ./bin/bundle with 
no
+       // ./bin) instead of failing with an opaque temp-file "no such file" 
error.
+       if err := os.MkdirAll(outDir, 0o755); err != nil {
+               return fmt.Errorf("creating output directory %s: %w", outDir, 
err)
+       }
+       tmp, err := os.CreateTemp(outDir, ".airflow-go-pack-*")
+       if err != nil {
+               return fmt.Errorf("creating temp file for %s: %w", output, err)
+       }
+       tmpPath := tmp.Name()
+       _ = tmp.Close()
+       committed := false
+       defer func() {
+               if !committed {
+                       _ = os.Remove(tmpPath)
+               }
+       }()
+
+       if err := copyFile(execPath, tmpPath, 0o755); err != nil {
+               return fmt.Errorf("writing %s: %w", output, err)
+       }
+       if err := bundlefooter.Append(tmpPath, source, metadata); err != nil {
+               return err
+       }
+       if err := os.Rename(tmpPath, output); err != nil {
+               return fmt.Errorf("finalising %s: %w", output, err)
+       }
+       committed = true
+       return nil
+}
+
+// copyFile copies src to dst, truncating dst if it already exists.
+func copyFile(src, dst string, mode os.FileMode) error {
+       in, err := os.Open(src)
+       if err != nil {
+               return err
+       }
+       defer in.Close()
+       out, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode)
+       if err != nil {
+               return err
+       }
+       if _, err := io.Copy(out, in); err != nil {
+               out.Close()
+               return err
+       }
+       if err := out.Close(); err != nil {
+               return err
+       }
+       return os.Chmod(dst, mode)
+}
diff --git a/go-sdk/cmd/airflow-go-pack/pack_integration_test.go 
b/go-sdk/cmd/airflow-go-pack/pack_integration_test.go
new file mode 100644
index 00000000000..edb51824c4c
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/pack_integration_test.go
@@ -0,0 +1,357 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "io"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "regexp"
+       "runtime"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "gopkg.in/yaml.v3"
+
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+       "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+// crossArchFor returns an architecture different from the host that the Go
+// toolchain can target, or "" if we have no safe mapping for this host.
+func crossArchFor(hostArch string) string {
+       switch hostArch {
+       case "amd64":
+               return "arm64"
+       case "arm64":
+               return "amd64"
+       default:
+               return ""
+       }
+}
+
+// End-to-end cross-arch --executable test: a binary built for an arch the host
+// cannot run is packed into a spec-conforming bundle with its binary region
+// preserved byte-for-byte. The caller supplies the artefact's own
+// --airflow-metadata output (captured here from a host build of the same
+// sources) rather than the packer rebuilding to guess the metadata.
+func TestPack_CrossArchExecutableWithMetadataFile(t *testing.T) {
+       if testing.Short() {
+               t.Skip("cross-arch pack test shells out to `go build` twice")
+       }
+       if _, err := exec.LookPath("go"); err != nil {
+               t.Skip("go toolchain not on PATH")
+       }
+       crossArch := crossArchFor(runtime.GOARCH)
+       if crossArch == "" {
+               t.Skipf("no cross-arch mapping for host arch %q", 
runtime.GOARCH)
+       }
+
+       // The example bundle is a real BundleProvider that answers
+       // --airflow-metadata, so it exercises the genuine metadata path.
+       exampleDir, err := filepath.Abs(filepath.Join("..", "..", "example", 
"bundle"))
+       require.NoError(t, err)
+       sourceFile := filepath.Join(exampleDir, "main.go")
+       if _, err := os.Stat(sourceFile); err != nil {
+               t.Skipf("example bundle source not found: %v", err)
+       }
+
+       tmp := t.TempDir()
+       crossBin := filepath.Join(tmp, "prebuilt_cross")
+       hostBin := filepath.Join(tmp, "prebuilt_host")
+
+       // Build the example for a foreign arch (the --executable input) and for
+       // the host. CGO is disabled so the cross build needs no C toolchain.
+       goBuild(t, exampleDir, crossBin, runtime.GOOS, crossArch)
+       goBuild(t, exampleDir, hostBin, runtime.GOOS, runtime.GOARCH)
+
+       crossBytes, err := os.ReadFile(crossBin)
+       require.NoError(t, err)
+       hostBytes, err := os.ReadFile(hostBin)
+       require.NoError(t, err)
+       require.False(t, bytes.Equal(crossBytes, hostBytes),
+               "cross and host builds should differ; cross-compile may not 
have taken effect")
+
+       // Capture the artefact's own --airflow-metadata JSON from the host 
build,
+       // standing in for the author running the binary on its native platform.
+       metaJSON := filepath.Join(tmp, "airflow-metadata.json")
+       captureMetadata(t, hostBin, metaJSON)
+
+       // Pack the foreign-arch executable through the real CLI command, 
feeding
+       // the captured metadata so no host rebuild is needed.
+       outPath := filepath.Join(tmp, "bundle")
+       cmd := newRootCmd()
+       cmd.SetArgs([]string{
+               "--executable", crossBin,
+               "--source", sourceFile,
+               "--airflow-metadata", metaJSON,
+               "--output", outPath,
+       })
+       cmd.SetOut(&bytes.Buffer{})
+       cmd.SetErr(&bytes.Buffer{})
+       require.NoError(t, cmd.Execute())
+
+       bundleBytes, err := os.ReadFile(outPath)
+       require.NoError(t, err)
+
+       // Read parses the trailer and verifies binary_sha256 over the binary
+       // region; success means the bundle is spec-valid.
+       source, metadata, err := bundlefooter.Read(outPath)
+       require.NoError(t, err)
+
+       srcBytes, err := os.ReadFile(sourceFile)
+       require.NoError(t, err)
+       assert.Equal(t, srcBytes, source, "embedded source must match --source 
bytes")
+
+       // sdk.version is environment-dependent and not asserted verbatim: a 
plain
+       // `go build` from a local module tree leaves Main.Version unset and 
yields
+       // "(devel)", while Go 1.24's VCS stamping (e.g. in CI, building from 
the
+       // git checkout) yields a pseudo-version like 
v0.0.0-<timestamp>-<commit>,
+       // and a tagged-release build yields a semver tag. Assert the version 
line
+       // matches an accepted form, then fold the observed value into the 
expected
+       // manifest so the remaining fields and ordering are checked exactly.
+       // supervisor_schema_version and the format version come from SDK 
constants
+       // and change only when those constants do.
+       versionLine := regexp.MustCompile(`(?m)^  version: "([^"]*)"$`)
+       m := versionLine.FindStringSubmatch(string(metadata))
+       require.NotNil(t, m, "manifest must contain an sdk.version line:\n%s", 
metadata)
+       sdkVersion := m[1]
+       assert.Regexp(t, `^(\(devel\)|v[0-9].*)$`, sdkVersion,
+               `sdk.version must be "(devel)" or a v-prefixed module version`)
+
+       expectedManifest := `airflow_bundle_metadata_version: "1.0"
+sdk:
+  language: "go"
+  version: "` + sdkVersion + `"
+  supervisor_schema_version: "2026-06-16"
+source: "main.go"
+dags:
+  simple_dag:
+    tasks:
+      - "extract"
+      - "transform"
+      - "load"
+`
+       assert.Equal(t, expectedManifest, string(metadata))
+
+       // The packed binary region must be exactly the foreign-arch executable:
+       // the captured metadata describes it, and the binary is never rebuilt.
+       binaryRegion := 
bundleBytes[:len(bundleBytes)-len(source)-len(metadata)-bundlefooter.TrailerSize]
+       assert.Equal(t, crossBytes, binaryRegion,
+               "packed binary region must be the foreign-arch --executable, 
not a host rebuild")
+}
+
+// End-to-end build-mode cross-compile (no --executable): with GOOS/GOARCH set,
+// the packer builds the target-arch artefact and a host-arch binary (solely to
+// read the manifest), forwarding the `--` go build flags. The packed binary
+// must be the target-arch artefact built with the forwarded flags.
+func TestPack_CrossCompileBuildModeForwardsFlags(t *testing.T) {
+       if testing.Short() {
+               t.Skip("cross-arch pack test shells out to `go build` twice")
+       }
+       if _, err := exec.LookPath("go"); err != nil {
+               t.Skip("go toolchain not on PATH")
+       }
+       crossArch := crossArchFor(runtime.GOARCH)
+       if crossArch == "" {
+               t.Skipf("no cross-arch mapping for host arch %q", 
runtime.GOARCH)
+       }
+
+       exampleDir, err := filepath.Abs(filepath.Join("..", "..", "example", 
"bundle"))
+       require.NoError(t, err)
+       if _, err := os.Stat(filepath.Join(exampleDir, "main.go")); err != nil {
+               t.Skipf("example bundle source not found: %v", err)
+       }
+
+       // Cross-compile via the environment, exactly as a user would. CGO is
+       // disabled so the cross build needs no C toolchain.
+       t.Setenv("GOOS", runtime.GOOS)
+       t.Setenv("GOARCH", crossArch)
+       t.Setenv("CGO_ENABLED", "0")
+
+       tmp := t.TempDir()
+       outPath := filepath.Join(tmp, "bundle")
+
+       // Pack the example package (no --executable, no --source), forwarding
+       // -trimpath after the "--" separator to the internal go build.
+       cmd := newRootCmd()
+       cmd.SetArgs([]string{exampleDir, "--output", outPath, "--", 
"-trimpath"})
+       cmd.SetOut(&bytes.Buffer{})
+       cmd.SetErr(&bytes.Buffer{})
+       require.NoError(t, cmd.Execute())
+
+       source, metadata, err := bundlefooter.Read(outPath)
+       require.NoError(t, err)
+       assert.Contains(t, string(metadata), "simple_dag:",
+               "manifest must be read from the host introspection build")
+
+       // Independently build the target-arch artefact with the same forwarded
+       // flag; the packed binary region must match it byte-for-byte, proving 
the
+       // deployable artefact is the cross build (not the host introspection 
one)
+       // and that -trimpath was forwarded.
+       wantBin := filepath.Join(tmp, "want_cross")
+       build := exec.Command("go", "build", "-trimpath", "-o", wantBin, 
exampleDir)
+       build.Env = append(os.Environ(), "GOOS="+runtime.GOOS, 
"GOARCH="+crossArch, "CGO_ENABLED=0")
+       if combined, berr := build.CombinedOutput(); berr != nil {
+               t.Fatalf("reference cross build failed: %v\n%s", berr, combined)
+       }
+       wantBytes, err := os.ReadFile(wantBin)
+       require.NoError(t, err)
+
+       bundleBytes, err := os.ReadFile(outPath)
+       require.NoError(t, err)
+       binaryRegion := 
bundleBytes[:len(bundleBytes)-len(source)-len(metadata)-bundlefooter.TrailerSize]
+       assert.Equal(t, wantBytes, binaryRegion,
+               "packed binary must be the cross-built artefact with -trimpath 
forwarded")
+}
+
+// When --source is supplied, the packer skips source discovery, so a package
+// whose main file cannot be auto-detected (two files with func main, which
+// discovery rejects as ambiguous) is still packable. The failure is the
+// downstream `go build` error, not the discovery error — proving discovery was
+// skipped.
+func TestRunPack_SourceBypassesDiscovery(t *testing.T) {
+       if testing.Short() {
+               t.Skip("shells out to the go toolchain")
+       }
+       if _, err := exec.LookPath("go"); err != nil {
+               t.Skip("go toolchain not on PATH")
+       }
+
+       // Two files both define func main(), which discoverMainSource rejects 
as
+       // ambiguous. Building them together is also a compile error, so 
reaching
+       // `go build` (rather than failing in discovery) is the observable 
signal
+       // that --source bypassed discovery.
+       pkgDir := t.TempDir()
+       require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "go.mod"),
+               []byte("module ambiguousmain\n\ngo 1.24\n"), 0o644))
+       require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "a.go"),
+               []byte("package main\n\nfunc main() {}\n"), 0o644))
+       require.NoError(t, os.WriteFile(filepath.Join(pkgDir, "b.go"),
+               []byte("package main\n\nfunc main() {}\n"), 0o644))
+
+       source := filepath.Join(pkgDir, "a.go")
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               pkg:    pkgDir,
+               source: source,
+               output: filepath.Join(t.TempDir(), "bundle"),
+       })
+       require.Error(t, err)
+       // With --source honoured, discovery is skipped; the build still runs 
and
+       // fails on the duplicate main.
+       assert.NotContains(t, err.Error(), "locating DAG source file",
+               "--source must bypass source discovery")
+       assert.Contains(t, err.Error(), "go build failed",
+               "packing should proceed to the build step when --source is 
given")
+}
+
+// Checks the bundle binary's --airflow-metadata encodings: default is YAML,
+// --format json emits JSON, both decode to the same manifest, and --format
+// without --airflow-metadata is a hard error.
+func TestBundleBinary_AirflowMetadataFormats(t *testing.T) {
+       if testing.Short() {
+               t.Skip("shells out to `go build`")
+       }
+       if _, err := exec.LookPath("go"); err != nil {
+               t.Skip("go toolchain not on PATH")
+       }
+
+       exampleDir, err := filepath.Abs(filepath.Join("..", "..", "example", 
"bundle"))
+       require.NoError(t, err)
+       if _, err := os.Stat(filepath.Join(exampleDir, "main.go")); err != nil {
+               t.Skipf("example bundle source not found: %v", err)
+       }
+
+       hostBin := filepath.Join(t.TempDir(), "bundle")
+       goBuild(t, exampleDir, hostBin, runtime.GOOS, runtime.GOARCH)
+
+       assertManifest := func(t *testing.T, meta airflowmetadata.Manifest) {
+               t.Helper()
+               assert.Equal(t, "go", meta.SDK.Language)
+               require.Contains(t, meta.Dags, "simple_dag")
+               assert.Equal(t, []string{"extract", "transform", "load"}, 
meta.Dags["simple_dag"].Tasks)
+       }
+
+       // Default: YAML. A JSON document would start with '{'.
+       yamlOut, err := exec.Command(hostBin, "--airflow-metadata").Output()
+       require.NoError(t, err, "running %s --airflow-metadata", hostBin)
+       assert.False(t, bytes.HasPrefix(bytes.TrimSpace(yamlOut), []byte("{")),
+               "default --airflow-metadata must emit YAML, not JSON")
+       assert.Contains(t, string(yamlOut), "airflow_bundle_metadata_version:")
+       var fromYAML airflowmetadata.Manifest
+       require.NoError(t, yaml.Unmarshal(yamlOut, &fromYAML))
+       assertManifest(t, fromYAML)
+
+       // --format json: JSON output.
+       jsonOut, err := exec.Command(hostBin, "--airflow-metadata", "--format", 
"json").Output()
+       require.NoError(t, err, "running %s --airflow-metadata --format json", 
hostBin)
+       assert.True(t, bytes.HasPrefix(bytes.TrimSpace(jsonOut), []byte("{")),
+               "--format json must emit JSON")
+       var fromJSON airflowmetadata.Manifest
+       require.NoError(t, json.Unmarshal(jsonOut, &fromJSON))
+       assertManifest(t, fromJSON)
+
+       assert.Equal(t, fromYAML, fromJSON, "both encodings must decode to the 
same manifest")
+
+       // --format without --airflow-metadata is a usage error.
+       bad, err := exec.Command(hostBin, "--format", "json").CombinedOutput()
+       require.Error(t, err, "--format without --airflow-metadata must exit 
non-zero")
+       assert.Contains(t, string(bad), "--format is only valid together with 
--airflow-metadata")
+}
+
+// Running the packer from a directory that is not a bundle main package must
+// turn the bare `go list` failure into an actionable error pointing at a
+// package path or --source.
+func TestDiscoverMainSource_NoGoFilesGivesGuidance(t *testing.T) {
+       if _, err := exec.LookPath("go"); err != nil {
+               t.Skip("go toolchain not on PATH")
+       }
+       dir := t.TempDir()
+       require.NoError(t, os.WriteFile(filepath.Join(dir, "go.mod"),
+               []byte("module testempty\n\ngo 1.24\n"), 0o644))
+       t.Chdir(dir)
+
+       _, err := discoverMainSource(".")
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "airflow-go-pack ./path/to/bundle",
+               "discovery failure should point the user at a package path")
+       assert.Contains(t, err.Error(), "--source")
+}
+
+func goBuild(t *testing.T, pkgDir, out, goos, goarch string) {
+       t.Helper()
+       cmd := exec.Command("go", "build", "-o", out, pkgDir)
+       cmd.Env = append(os.Environ(), "GOOS="+goos, "GOARCH="+goarch, 
"CGO_ENABLED=0")
+       if combined, err := cmd.CombinedOutput(); err != nil {
+               t.Fatalf("go build %s for %s/%s failed: %v\n%s", pkgDir, goos, 
goarch, err, combined)
+       }
+}
+
+// captureMetadata runs a host-runnable bundle binary with --airflow-metadata
+// and writes its JSON stdout to outPath.
+func captureMetadata(t *testing.T, hostBin, outPath string) {
+       t.Helper()
+       cmd := exec.Command(hostBin, "--airflow-metadata")
+       out, err := cmd.Output()
+       require.NoError(t, err, "running %s --airflow-metadata", hostBin)
+       require.NoError(t, os.WriteFile(outPath, out, 0o644))
+}
diff --git a/go-sdk/cmd/airflow-go-pack/pack_test.go 
b/go-sdk/cmd/airflow-go-pack/pack_test.go
new file mode 100644
index 00000000000..999d5c0f017
--- /dev/null
+++ b/go-sdk/cmd/airflow-go-pack/pack_test.go
@@ -0,0 +1,506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "errors"
+       "io"
+       "os"
+       "path/filepath"
+       "runtime"
+       "testing"
+
+       "github.com/spf13/cobra"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+       "github.com/apache/airflow/go-sdk/internal/bundlefooter"
+)
+
+func TestRenderManifest_DeterministicDagOrdering(t *testing.T) {
+       meta := airflowmetadata.Manifest{
+               AirflowBundleMetadataVersion: "1.0",
+               SDK: airflowmetadata.SDK{
+                       Language:                "go",
+                       Version:                 "0.1.0",
+                       SupervisorSchemaVersion: "2026-06-16",
+               },
+               Dags: map[string]airflowmetadata.Dag{
+                       "zeta_dag":  {Tasks: []string{"a", "b"}},
+                       "alpha_dag": {Tasks: []string{"x"}},
+               },
+       }
+
+       got1, err := renderManifest(meta, "main.go")
+       require.NoError(t, err)
+       got2, err := renderManifest(meta, "main.go")
+       require.NoError(t, err)
+
+       assert.Equal(t, got1, got2, "manifest should be byte-identical for 
identical input")
+
+       expected := `airflow_bundle_metadata_version: "1.0"
+sdk:
+  language: "go"
+  version: "0.1.0"
+  supervisor_schema_version: "2026-06-16"
+source: "main.go"
+dags:
+  alpha_dag:
+    tasks:
+      - "x"
+  zeta_dag:
+    tasks:
+      - "a"
+      - "b"
+`
+       assert.Equal(t, expected, string(got1))
+}
+
+// Values (task IDs, source, SDK fields) are quoted so a scalar-looking value
+// stays a string; Dag ID keys stay plain scalars.
+func TestRenderManifest_QuotesValuesNotKeys(t *testing.T) {
+       meta := airflowmetadata.Manifest{
+               AirflowBundleMetadataVersion: "1.0",
+               SDK: airflowmetadata.SDK{
+                       Language:                "go",
+                       Version:                 "0.1.0",
+                       SupervisorSchemaVersion: "2026-06-16",
+               },
+               Dags: map[string]airflowmetadata.Dag{
+                       "my_dag": {Tasks: []string{"123", "true"}},
+               },
+       }
+
+       got, err := renderManifest(meta, "main.go")
+       require.NoError(t, err)
+
+       // Task values that look like scalars are quoted.
+       assert.Contains(t, string(got), `- "123"`)
+       assert.Contains(t, string(got), `- "true"`)
+       // The Dag ID key is a plain scalar, not quoted.
+       assert.Contains(t, string(got), "\n  my_dag:\n")
+       assert.NotContains(t, string(got), `"my_dag"`)
+}
+
+func TestRenderManifest_EmptyDags(t *testing.T) {
+       meta := airflowmetadata.Manifest{
+               AirflowBundleMetadataVersion: "1.0",
+               SDK: airflowmetadata.SDK{
+                       Language:                "go",
+                       Version:                 "0.1.0",
+                       SupervisorSchemaVersion: "2026-06-16",
+               },
+               Dags: map[string]airflowmetadata.Dag{},
+       }
+       got, err := renderManifest(meta, "main.go")
+       require.NoError(t, err)
+       assert.Contains(t, string(got), "dags: {}")
+}
+
+// Forwarded `go build` flags after "--" must not count against 
MaximumNArgs(1).
+func TestRootArgs_AllowsBuildFlagsAfterDoubleDash(t *testing.T) {
+       cases := [][]string{
+               {"--", "-ldflags", "-X main.dagId=foo"},
+               {"./pkg", "--", "-ldflags", "-X main.dagId=foo"},
+               {"--", "-trimpath", "-tags=prod"},
+       }
+       for _, argv := range cases {
+               cmd := newRootCmd()
+               // Stop the command from actually running; we only want arg 
validation.
+               cmd.RunE = func(*cobra.Command, []string) error { return nil }
+               cmd.SetArgs(argv)
+               assert.NoError(t, cmd.Execute(), "args=%v should validate", 
argv)
+       }
+}
+
+// A file the OS refuses to exec is errExecNotStartable; a binary that runs and
+// exits non-zero is not.
+func TestRunIntrospect_ClassifiesExecFailure(t *testing.T) {
+       if runtime.GOOS == "windows" {
+               t.Skip("exec-format semantics differ on Windows")
+       }
+       dir := t.TempDir()
+
+       // A non-binary file with the exec bit set: execve rejects it with an
+       // exec-format error, standing in for a foreign-arch executable.
+       garbage := filepath.Join(dir, "garbage")
+       require.NoError(t, os.WriteFile(garbage, []byte("not a real 
executable\n"), 0o755))
+       _, err := runIntrospect(garbage, "--airflow-metadata")
+       require.Error(t, err)
+       assert.ErrorIs(t, err, errExecNotStartable)
+
+       // A script that starts and exits non-zero is a genuine run failure, not
+       // an unrunnable binary, so it must NOT be classified as not-startable.
+       failing := filepath.Join(dir, "failing")
+       require.NoError(t, os.WriteFile(failing, []byte("#!/bin/sh\nexit 3\n"), 
0o755))
+       _, err = runIntrospect(failing, "--airflow-metadata")
+       require.Error(t, err)
+       assert.False(
+               t,
+               errors.Is(err, errExecNotStartable),
+               "non-zero exit should not be errExecNotStartable",
+       )
+}
+
+func TestRootArgs_RejectsExtraPositionalBeforeDash(t *testing.T) {
+       cmd := newRootCmd()
+       cmd.RunE = func(*cobra.Command, []string) error { return nil }
+       cmd.SetArgs([]string{"./pkg1", "./pkg2", "--", "-ldflags", "-X 
main.dagId=foo"})
+       err := cmd.Execute()
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "accepts at most 1 arg")
+}
+
+func TestSameFile(t *testing.T) {
+       dir := t.TempDir()
+       a := filepath.Join(dir, "a")
+       b := filepath.Join(dir, "b")
+       require.NoError(t, os.WriteFile(a, []byte("a"), 0o644))
+       require.NoError(t, os.WriteFile(b, []byte("b"), 0o644))
+
+       // Distinct existing files do not alias.
+       same, err := sameFile(a, b)
+       require.NoError(t, err)
+       assert.False(t, same)
+
+       // Two spellings of the same path alias even though only one exists on
+       // disk under the literal string ("./a" vs "a" relative to the same 
dir).
+       same, err = sameFile(filepath.Join(dir, "x"), filepath.Join(dir, ".", 
"x"))
+       require.NoError(t, err)
+       assert.True(t, same, "cleaned-abs equality should treat ./x and x as 
the same file")
+
+       // A non-existent output never aliases an existing input by inode.
+       same, err = sameFile(filepath.Join(dir, "does-not-exist"), a)
+       require.NoError(t, err)
+       assert.False(t, same)
+
+       // A symlink that points at the input shares its inode.
+       link := filepath.Join(dir, "link-to-a")
+       if err := os.Symlink(a, link); err != nil {
+               t.Skipf("symlinks unsupported: %v", err)
+       }
+       same, err = sameFile(link, a)
+       require.NoError(t, err)
+       assert.True(t, same, "a symlink to the file should alias it")
+}
+
+// When --output resolves to the same file as --executable, runPack must refuse
+// before copyFile truncates the input; the executable's bytes survive.
+func TestRunPack_RejectsOutputAliasingExecutable(t *testing.T) {
+       dir := t.TempDir()
+       exec := filepath.Join(dir, "bundle")
+       source := filepath.Join(dir, "main.go")
+       original := []byte("prebuilt-binary-bytes")
+       require.NoError(t, os.WriteFile(exec, original, 0o755))
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable: exec,
+               source:     source,
+               output:     exec,
+       })
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "same file as the executable")
+
+       // The guard must fire before any write: the executable is intact.
+       got, readErr := os.ReadFile(exec)
+       require.NoError(t, readErr)
+       assert.Equal(t, original, got, "executable must not be truncated when 
output aliases it")
+}
+
+// When --output resolves to the same file as --airflow-metadata, runPack must
+// refuse before the bundle is renamed onto it; the manifest file survives.
+func TestRunPack_RejectsOutputAliasingMetadataFile(t *testing.T) {
+       dir := t.TempDir()
+       exe := filepath.Join(dir, "foreign")
+       require.NoError(t, os.WriteFile(exe, 
[]byte("foreign-arch-binary-bytes"), 0o755))
+       source := filepath.Join(dir, "main.go")
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+       meta := filepath.Join(dir, "airflow-metadata.json")
+       original := []byte(
+               `{"airflow_bundle_metadata_version":"1.0",` +
+                       
`"sdk":{"language":"go","version":"0.1.0","supervisor_schema_version":"2026-06-16"},`
 +
+                       `"dags":{"my_dag":{"tasks":["t1"]}}}`,
+       )
+       require.NoError(t, os.WriteFile(meta, original, 0o644))
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable:      exe,
+               source:          source,
+               airflowMetadata: meta,
+               output:          meta,
+       })
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "same file as the --airflow-metadata 
file")
+
+       // The guard must fire before any write: the manifest file is intact.
+       got, readErr := os.ReadFile(meta)
+       require.NoError(t, readErr)
+       assert.Equal(t, original, got, "metadata file must not be clobbered 
when output aliases it")
+}
+
+// When the default output path names an existing directory, the packer must
+// reject it with --output guidance, not a bare os.Rename "file exists".
+func TestRunPack_RejectsDirectoryOutput(t *testing.T) {
+       dir := t.TempDir()
+       exe := filepath.Join(dir, "prebuilt")
+       require.NoError(t, os.WriteFile(exe, []byte("prebuilt-binary-bytes"), 
0o755))
+       source := filepath.Join(dir, "main.go")
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+
+       outDir := filepath.Join(dir, "bundle")
+       require.NoError(t, os.Mkdir(outDir, 0o755))
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable: exe,
+               source:     source,
+               output:     outDir,
+       })
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "is an existing directory")
+       assert.Contains(t, err.Error(), "--output", "error must point the user 
at --output")
+}
+
+// --executable and --goos/--goarch are mutually exclusive: --executable packs
+// the binary as-is and never builds, so it cannot cross-compile.
+func TestRunPack_RejectsExecutableWithCrossFlags(t *testing.T) {
+       for _, tc := range []struct {
+               name string
+               opts packOptions
+       }{
+               {name: "goos", opts: packOptions{executable: "bin", goos: 
"linux"}},
+               {name: "goarch", opts: packOptions{executable: "bin", goarch: 
"amd64"}},
+       } {
+               t.Run(tc.name, func(t *testing.T) {
+                       err := runPack(io.Discard, io.Discard, &tc.opts)
+                       require.Error(t, err)
+                       assert.Contains(
+                               t,
+                               err.Error(),
+                               "--executable is mutually exclusive with 
--goos/--goarch",
+                       )
+               })
+       }
+}
+
+// A foreign-arch --executable that cannot be exec'd on the host is a hard 
error
+// with remediation guidance, never a silent host rebuild that could describe a
+// different DAG/task set than the shipped binary.
+func TestRunPack_FailsFastWhenExecutableUnrunnableAndNoMetadata(t *testing.T) {
+       if runtime.GOOS == "windows" {
+               t.Skip("exec-format semantics differ on Windows")
+       }
+       dir := t.TempDir()
+       // A non-binary file with the exec bit set stands in for a foreign-arch
+       // executable: execve rejects it with an exec-format error.
+       exe := filepath.Join(dir, "foreign")
+       require.NoError(t, os.WriteFile(exe, []byte("not a runnable binary\n"), 
0o755))
+       source := filepath.Join(dir, "main.go")
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+       out := filepath.Join(dir, "bundle")
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable: exe,
+               source:     source,
+               output:     out,
+       })
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "cannot exec --executable")
+       assert.Contains(t, err.Error(), "--goarch", "error must point at the 
cross-build workflow")
+       assert.Contains(
+               t,
+               err.Error(),
+               "--airflow-metadata",
+               "error must mention the --airflow-metadata escape hatch",
+       )
+
+       _, statErr := os.Stat(out)
+       assert.True(t, os.IsNotExist(statErr), "no bundle should be written on 
a fail-fast")
+}
+
+// --airflow-metadata short-circuits introspection: a binary that cannot run on
+// the host is packed using the supplied JSON manifest, and its bytes survive.
+func TestRunPack_UsesMetadataFile(t *testing.T) {
+       dir := t.TempDir()
+       exe := filepath.Join(dir, "foreign")
+       exeBytes := []byte("foreign-arch-binary-bytes")
+       require.NoError(t, os.WriteFile(exe, exeBytes, 0o755))
+       source := filepath.Join(dir, "main.go")
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+       meta := filepath.Join(dir, "airflow-metadata.json")
+       require.NoError(t, os.WriteFile(meta, []byte(
+               `{"airflow_bundle_metadata_version":"1.0",`+
+                       
`"sdk":{"language":"go","version":"0.1.0","supervisor_schema_version":"2026-06-16"},`+
+                       `"dags":{"my_dag":{"tasks":["t1"]}}}`,
+       ), 0o644))
+       out := filepath.Join(dir, "bundle")
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable:      exe,
+               source:          source,
+               airflowMetadata: meta,
+               output:          out,
+       })
+       require.NoError(t, err)
+
+       gotSource, gotMeta, err := bundlefooter.Read(out)
+       require.NoError(t, err)
+       srcBytes, err := os.ReadFile(source)
+       require.NoError(t, err)
+       assert.Equal(t, srcBytes, gotSource)
+       assert.Contains(
+               t,
+               string(gotMeta),
+               "my_dag:",
+               "Dag from --airflow-metadata must appear in the manifest",
+       )
+
+       bundleBytes, err := os.ReadFile(out)
+       require.NoError(t, err)
+       binaryRegion := 
bundleBytes[:len(bundleBytes)-len(gotSource)-len(gotMeta)-bundlefooter.TrailerSize]
+       assert.Equal(t, exeBytes, binaryRegion, "the supplied --executable must 
be packed verbatim")
+}
+
+// --airflow-metadata also accepts a YAML manifest, not only the JSON the
+// binary prints.
+func TestRunPack_AcceptsYAMLMetadataFile(t *testing.T) {
+       dir := t.TempDir()
+       exe := filepath.Join(dir, "foreign")
+       require.NoError(t, os.WriteFile(exe, 
[]byte("foreign-arch-binary-bytes"), 0o755))
+       source := filepath.Join(dir, "main.go")
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+       meta := filepath.Join(dir, "airflow-metadata.yaml")
+       require.NoError(t, os.WriteFile(meta, []byte(
+               "airflow_bundle_metadata_version: \"1.0\"\n"+
+                       "sdk:\n"+
+                       "  language: \"go\"\n"+
+                       "  version: \"0.1.0\"\n"+
+                       "  supervisor_schema_version: \"2026-06-16\"\n"+
+                       "dags:\n"+
+                       "  yaml_dag:\n"+
+                       "    tasks:\n"+
+                       "      - \"t1\"\n",
+       ), 0o644))
+       out := filepath.Join(dir, "bundle")
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable:      exe,
+               source:          source,
+               airflowMetadata: meta,
+               output:          out,
+       })
+       require.NoError(t, err)
+
+       _, gotMeta, err := bundlefooter.Read(out)
+       require.NoError(t, err)
+       assert.Contains(
+               t,
+               string(gotMeta),
+               "yaml_dag:",
+               "Dag from a YAML --airflow-metadata file must appear in the 
manifest",
+       )
+}
+
+// With no explicit --output, packing "./bundle" from a package dir named
+// "bundle" derives a default output that collides with the pre-built binary.
+func TestRunPack_RejectsDefaultOutputAliasingExecutable(t *testing.T) {
+       parent := t.TempDir()
+       pkgDir := filepath.Join(parent, "bundle")
+       require.NoError(t, os.Mkdir(pkgDir, 0o755))
+       exec := filepath.Join(pkgDir, "bundle")
+       source := filepath.Join(pkgDir, "main.go")
+       original := []byte("prebuilt-binary-bytes")
+       require.NoError(t, os.WriteFile(exec, original, 0o755))
+       require.NoError(t, os.WriteFile(source, []byte("package main\nfunc 
main() {}\n"), 0o644))
+
+       // defaultOutputPath derives the bundle name from the package dir base
+       // ("bundle") relative to cwd, so run from pkgDir to reproduce the 
collision.
+       t.Chdir(pkgDir)
+
+       err := runPack(io.Discard, io.Discard, &packOptions{
+               executable: "./bundle",
+               source:     "main.go",
+       })
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "same file as the executable")
+
+       got, readErr := os.ReadFile(exec)
+       require.NoError(t, readErr)
+       assert.Equal(
+               t,
+               original,
+               got,
+               "executable must not be truncated by the default output 
collision",
+       )
+}
+
+// --goos/--goarch > env > host precedence. The flags let `go tool
+// airflow-go-pack` cross-compile without GOOS/GOARCH in the env (which would
+// cross-build the packer itself).
+func TestTargetPlatform(t *testing.T) {
+       // An empty value reads like an unset var to os.Getenv, so this clears 
any
+       // ambient cross-compile setting for the default case.
+       t.Setenv("GOOS", "")
+       t.Setenv("GOARCH", "")
+
+       t.Run("defaults to host", func(t *testing.T) {
+               goos, goarch := targetPlatform(&packOptions{})
+               assert.Equal(t, runtime.GOOS, goos)
+               assert.Equal(t, runtime.GOARCH, goarch)
+       })
+
+       t.Run("env overrides host", func(t *testing.T) {
+               t.Setenv("GOOS", "linux")
+               t.Setenv("GOARCH", "arm64")
+               goos, goarch := targetPlatform(&packOptions{})
+               assert.Equal(t, "linux", goos)
+               assert.Equal(t, "arm64", goarch)
+       })
+
+       t.Run("flags override env", func(t *testing.T) {
+               t.Setenv("GOOS", "linux")
+               t.Setenv("GOARCH", "arm64")
+               goos, goarch := targetPlatform(&packOptions{goos: "windows", 
goarch: "amd64"})
+               assert.Equal(t, "windows", goos)
+               assert.Equal(t, "amd64", goarch)
+       })
+
+       t.Run("resolves each axis independently", func(t *testing.T) {
+               t.Setenv("GOOS", "")
+               t.Setenv("GOARCH", "arm64")
+               goos, goarch := targetPlatform(&packOptions{goos: "windows"})
+               assert.Equal(t, "windows", goos, "goos from flag")
+               assert.Equal(t, "arm64", goarch, "goarch from env")
+       })
+}
+
+// When the --output parent directory does not exist, the packer must create it
+// instead of failing with an opaque temp-file error.
+func TestWriteBundle_CreatesMissingOutputDir(t *testing.T) {
+       dir := t.TempDir()
+       exe := filepath.Join(dir, "input-bin")
+       require.NoError(t, os.WriteFile(exe, []byte("binary-bytes"), 0o755))
+
+       output := filepath.Join(dir, "bin", "nested", "bundle")
+       require.NoError(t, writeBundle(exe, output, []byte("source"), 
[]byte("manifest")))
+
+       info, err := os.Stat(output)
+       require.NoError(t, err, "bundle should be written into the created 
directory")
+       assert.False(t, info.IsDir())
+}
diff --git a/go-sdk/example/bundle/Justfile b/go-sdk/example/bundle/Justfile
index ca211b304b1..5432aa34a9d 100644
--- a/go-sdk/example/bundle/Justfile
+++ b/go-sdk/example/bundle/Justfile
@@ -21,13 +21,32 @@
 default:
     @just --list
 
-# Build the example bundle
+# Build the example bundle (raw go build, no footer; for Edge Worker testing)
 build:
     @echo "Building example DAG bundle..."
     go build -o ../../bin/example-dag-bundle .
 
-
-# Build with specific name and version
+# Build with specific name and version (raw go build, no footer)
 build-with name="data_processing_example" version="1.0.0":
     @echo "Building example DAG bundle with name={{name}} 
version={{version}}..."
     go build -ldflags="-X 'main.bundleName={{name}}' -X 
'main.bundleVersion={{version}}'" -o ../../bin/{{name}}-{{version}} .
+
+# One-step build + pack. The single `go tool airflow-go-pack`
+# invocation runs `go build` internally, queries the binary for its
+# DAG/task identity via --airflow-metadata, and appends the source plus
+# airflow-metadata.yaml plus AFBNDL01 trailer. The output is a single
+# self-contained executable bundle, named after the bundle's package
+# directory and written to the current directory. Drop it into
+# [executable] bundles_folder to deploy.
+pack:
+    @echo "Packing example DAG bundle..."
+    go tool airflow-go-pack --output ../../bin/example_dags
+
+# Pack with extra go build flags forwarded after "--".
+pack-release:
+    @echo "Packing example DAG bundle (release flags)..."
+    go tool airflow-go-pack --output ../../bin/example_dags -- -trimpath 
-ldflags="-s -w"
+
+# Inspect a packed bundle's embedded manifest.
+inspect bundle="../../bin/example_dags":
+    go tool airflow-go-pack inspect {{bundle}}
diff --git a/go-sdk/go.mod b/go-sdk/go.mod
index f3bfcd4b0f6..cc738e4c3f8 100644
--- a/go-sdk/go.mod
+++ b/go-sdk/go.mod
@@ -4,6 +4,8 @@ go 1.24.0
 
 toolchain go1.24.6
 
+tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack
+
 require (
        github.com/cappuccinotm/slogx v1.4.2
        github.com/golang-jwt/jwt/v5 v5.3.0
@@ -60,5 +62,5 @@ require (
        github.com/samber/slog-http v1.8.2
        golang.org/x/sys v0.39.0 // indirect
        golang.org/x/text v0.32.0 // indirect
-       gopkg.in/yaml.v3 v3.0.1 // indirect
+       gopkg.in/yaml.v3 v3.0.1
 )
diff --git a/go-sdk/internal/airflowmetadata/airflowmetadata.go 
b/go-sdk/internal/airflowmetadata/airflowmetadata.go
new file mode 100644
index 00000000000..d11412d722c
--- /dev/null
+++ b/go-sdk/internal/airflowmetadata/airflowmetadata.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package airflowmetadata defines the airflow-metadata manifest wire shape
+// shared between the producer (a bundle binary's --airflow-metadata flag,
+// emitted from pkg/execution) and the consumer (airflow-go-pack, which decodes
+// it and renders the embedded airflow-metadata.yaml). Keeping the definition 
in
+// one place stops the two sides from drifting. The canonical schema is
+// airflow-metadata.schema.json in the Task SDK docs.
+package airflowmetadata
+
+// FormatVersion is the bundle-spec version emitted manifests conform to.
+const FormatVersion = "1.0"
+
+// Manifest is the shape printed by a bundle binary's --airflow-metadata flag
+// (YAML by default, JSON under --format json). It mirrors
+// airflow-metadata.schema.json minus the source field, which only the packer
+// can resolve from the build inputs. The yaml tags let the packer's
+// --airflow-metadata flag decode a captured manifest in either JSON or YAML
+// (the airflow-metadata.yaml in a bundle).
+type Manifest struct {
+       AirflowBundleMetadataVersion string         
`json:"airflow_bundle_metadata_version" yaml:"airflow_bundle_metadata_version"`
+       SDK                          SDK            `json:"sdk"                 
            yaml:"sdk"`
+       Dags                         map[string]Dag `json:"dags"                
            yaml:"dags"`
+}
+
+// SDK identifies the SDK that produced the bundle.
+type SDK struct {
+       Language                string `json:"language"                  
yaml:"language"`
+       Version                 string `json:"version"                   
yaml:"version"`
+       SupervisorSchemaVersion string `json:"supervisor_schema_version" 
yaml:"supervisor_schema_version"`
+}
+
+// Dag is the static description of a single DAG declared in the bundle.
+type Dag struct {
+       Tasks []string `json:"tasks" yaml:"tasks"`
+}
diff --git a/go-sdk/internal/bundlefooter/footer.go 
b/go-sdk/internal/bundlefooter/footer.go
new file mode 100644
index 00000000000..6641ab3af08
--- /dev/null
+++ b/go-sdk/internal/bundlefooter/footer.go
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package bundlefooter implements the AFBNDL01 trailer described in
+// ADR 0004 (and task-sdk/docs/executable-bundle-spec.rst). A bundle file is
+// the compiled executable with three appended regions: the source bytes, the
+// manifest bytes, and a fixed 64-byte trailer that locates them and carries a
+// SHA-256 over the binary region for integrity.
+//
+// The trailer layout (all little-endian) is:
+//
+//     bytes  0..3   source_len     uint32
+//     bytes  4..7   metadata_len   uint32
+//     bytes  8..11  footer_ver     uint32  (= 1)
+//     bytes 12..43  binary_sha256  32 bytes (SHA-256 of the binary region)
+//     bytes 44..55  reserved       12 bytes, zero
+//     bytes 56..63  magic          8 bytes ASCII "AFBNDL01"
+package bundlefooter
+
+import (
+       "crypto/sha256"
+       "encoding/binary"
+       "errors"
+       "fmt"
+       "io"
+       "math"
+       "os"
+)
+
+const (
+       // TrailerSize is the fixed length of the trailer, in bytes.
+       TrailerSize = 64
+
+       // FooterVersion is the currently defined trailer-format version.
+       FooterVersion = 1
+
+       // MaxRegionSize is the largest source or metadata region this footer
+       // format can address (uint32 length field).
+       MaxRegionSize = math.MaxUint32
+)
+
+// Magic is the 8-byte ASCII tag that identifies a file as a bundle.
+var Magic = [8]byte{'A', 'F', 'B', 'N', 'D', 'L', '0', '1'}
+
+// ErrNotBundle is returned by Read when the file does not end with the
+// AFBNDL01 magic.
+var ErrNotBundle = errors.New("bundlefooter: not a bundle (magic mismatch)")
+
+// ErrUnknownVersion is returned by Read when the trailer's footer_ver field
+// is something other than FooterVersion.
+var ErrUnknownVersion = errors.New("bundlefooter: unknown footer version")
+
+// ErrHashMismatch is returned by Read when the SHA-256 recomputed over the
+// binary region does not match the binary_sha256 recorded in the trailer.
+var ErrHashMismatch = errors.New("bundlefooter: binary_sha256 mismatch")
+
+// Trailer carries the parsed contents of a bundle's 64-byte trailer.
+type Trailer struct {
+       SourceLen     uint32
+       MetadataLen   uint32
+       FooterVersion uint32
+       BinarySHA256  [sha256.Size]byte
+}
+
+// Append writes the source bytes, metadata bytes, and trailer to the end of
+// the file at execPath. The file's existing contents (the executable) are
+// left intact and its mode bits are preserved. source MAY be nil/empty.
+func Append(execPath string, source, metadata []byte) error {
+       if int64(len(source)) > MaxRegionSize {
+               return fmt.Errorf(
+                       "bundlefooter: source region too large (%d bytes, max 
%d)",
+                       len(source),
+                       MaxRegionSize,
+               )
+       }
+       if int64(len(metadata)) > MaxRegionSize {
+               return fmt.Errorf(
+                       "bundlefooter: metadata region too large (%d bytes, max 
%d)",
+                       len(metadata),
+                       MaxRegionSize,
+               )
+       }
+
+       // Compute binary_sha256 over the file as it stands now: at this point 
the
+       // whole file is the binary region, so the digest matches what a reader
+       // recomputes over [0, source_start) after the append.
+       binaryHash, err := hashFile(execPath)
+       if err != nil {
+               return fmt.Errorf("bundlefooter: hashing binary region of %s: 
%w", execPath, err)
+       }
+
+       f, err := os.OpenFile(execPath, os.O_RDWR|os.O_APPEND, 0)
+       if err != nil {
+               return fmt.Errorf("bundlefooter: opening %s: %w", execPath, err)
+       }
+       defer f.Close()
+
+       if len(source) > 0 {
+               if _, err := f.Write(source); err != nil {
+                       return fmt.Errorf("bundlefooter: writing source region: 
%w", err)
+               }
+       }
+       if len(metadata) > 0 {
+               if _, err := f.Write(metadata); err != nil {
+                       return fmt.Errorf("bundlefooter: writing metadata 
region: %w", err)
+               }
+       }
+
+       trailer := encodeTrailer(uint32(len(source)), uint32(len(metadata)), 
binaryHash)
+       if _, err := f.Write(trailer[:]); err != nil {
+               return fmt.Errorf("bundlefooter: writing trailer: %w", err)
+       }
+       return nil
+}
+
+// Read parses the trailer of the file at path and returns the embedded
+// source and metadata regions. Returns ErrNotBundle if the magic does not
+// match (so callers may silently ignore non-bundle files).
+func Read(path string) (source, metadata []byte, err error) {
+       f, err := os.Open(path)
+       if err != nil {
+               return nil, nil, fmt.Errorf("bundlefooter: opening %s: %w", 
path, err)
+       }
+       defer f.Close()
+
+       stat, err := f.Stat()
+       if err != nil {
+               return nil, nil, fmt.Errorf("bundlefooter: stat %s: %w", path, 
err)
+       }
+       size := stat.Size()
+       if size < TrailerSize {
+               return nil, nil, ErrNotBundle
+       }
+
+       var trailer [TrailerSize]byte
+       if _, err := f.ReadAt(trailer[:], size-TrailerSize); err != nil {
+               return nil, nil, fmt.Errorf("bundlefooter: reading trailer: 
%w", err)
+       }
+
+       t, err := decodeTrailer(trailer)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       metadataStart := size - TrailerSize - int64(t.MetadataLen)
+       sourceStart := metadataStart - int64(t.SourceLen)
+       if sourceStart < 0 {
+               return nil, nil, fmt.Errorf(
+                       "bundlefooter: trailer reports regions larger than file 
(source_len=%d metadata_len=%d size=%d)",
+                       t.SourceLen,
+                       t.MetadataLen,
+                       size,
+               )
+       }
+       if sourceStart == 0 {
+               return nil, nil, fmt.Errorf("bundlefooter: empty binary region")
+       }
+
+       binaryHash, err := hashRegion(f, sourceStart)
+       if err != nil {
+               return nil, nil, fmt.Errorf("bundlefooter: hashing binary 
region of %s: %w", path, err)
+       }
+       if binaryHash != t.BinarySHA256 {
+               return nil, nil, fmt.Errorf("%w: %s", ErrHashMismatch, path)
+       }
+
+       if t.SourceLen > 0 {
+               source = make([]byte, t.SourceLen)
+               if _, err := f.ReadAt(source, sourceStart); err != nil && 
!errors.Is(err, io.EOF) {
+                       return nil, nil, fmt.Errorf("bundlefooter: reading 
source region: %w", err)
+               }
+       }
+       if t.MetadataLen > 0 {
+               metadata = make([]byte, t.MetadataLen)
+               if _, err := f.ReadAt(metadata, metadataStart); err != nil && 
!errors.Is(err, io.EOF) {
+                       return nil, nil, fmt.Errorf("bundlefooter: reading 
metadata region: %w", err)
+               }
+       }
+       return source, metadata, nil
+}
+
+// IsBundle reports whether the file at path ends with the AFBNDL01 magic.
+// It does not validate the trailer beyond the magic check, so a file with a
+// matching magic but a corrupt trailer body still returns true.
+func IsBundle(path string) (bool, error) {
+       f, err := os.Open(path)
+       if err != nil {
+               return false, err
+       }
+       defer f.Close()
+
+       stat, err := f.Stat()
+       if err != nil {
+               return false, err
+       }
+       if stat.Size() < TrailerSize {
+               return false, nil
+       }
+
+       var tail [8]byte
+       if _, err := f.ReadAt(tail[:], stat.Size()-int64(len(tail))); err != 
nil {
+               return false, err
+       }
+       return tail == Magic, nil
+}
+
+func encodeTrailer(sourceLen, metadataLen uint32, binaryHash 
[sha256.Size]byte) [TrailerSize]byte {
+       var t [TrailerSize]byte
+       binary.LittleEndian.PutUint32(t[0:4], sourceLen)
+       binary.LittleEndian.PutUint32(t[4:8], metadataLen)
+       binary.LittleEndian.PutUint32(t[8:12], FooterVersion)
+       copy(t[12:44], binaryHash[:])
+       // bytes 44..55 are reserved, zero
+       copy(t[56:64], Magic[:])
+       return t
+}
+
+func decodeTrailer(b [TrailerSize]byte) (Trailer, error) {
+       var magic [8]byte
+       copy(magic[:], b[56:64])
+       if magic != Magic {
+               return Trailer{}, ErrNotBundle
+       }
+       t := Trailer{
+               SourceLen:     binary.LittleEndian.Uint32(b[0:4]),
+               MetadataLen:   binary.LittleEndian.Uint32(b[4:8]),
+               FooterVersion: binary.LittleEndian.Uint32(b[8:12]),
+       }
+       copy(t.BinarySHA256[:], b[12:44])
+       if t.FooterVersion != FooterVersion {
+               return Trailer{}, fmt.Errorf("%w: %d", ErrUnknownVersion, 
t.FooterVersion)
+       }
+       return t, nil
+}
+
+// hashFile computes SHA-256 over the entire contents of the file at path.
+func hashFile(path string) ([sha256.Size]byte, error) {
+       var sum [sha256.Size]byte
+       f, err := os.Open(path)
+       if err != nil {
+               return sum, err
+       }
+       defer f.Close()
+       h := sha256.New()
+       if _, err := io.Copy(h, f); err != nil {
+               return sum, err
+       }
+       copy(sum[:], h.Sum(nil))
+       return sum, nil
+}
+
+// hashRegion computes SHA-256 over the first length bytes of f, seeking to the
+// start first. It is used to recompute binary_sha256 over the binary region
+// [0, source_start). It does not disturb subsequent ReadAt calls.
+func hashRegion(f *os.File, length int64) ([sha256.Size]byte, error) {
+       var sum [sha256.Size]byte
+       if _, err := f.Seek(0, io.SeekStart); err != nil {
+               return sum, err
+       }
+       h := sha256.New()
+       if _, err := io.CopyN(h, f, length); err != nil {
+               return sum, err
+       }
+       copy(sum[:], h.Sum(nil))
+       return sum, nil
+}
diff --git a/go-sdk/internal/bundlefooter/footer_test.go 
b/go-sdk/internal/bundlefooter/footer_test.go
new file mode 100644
index 00000000000..33136017455
--- /dev/null
+++ b/go-sdk/internal/bundlefooter/footer_test.go
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bundlefooter
+
+import (
+       "errors"
+       "os"
+       "path/filepath"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func writeTempBinary(t *testing.T, contents []byte) string {
+       t.Helper()
+       dir := t.TempDir()
+       path := filepath.Join(dir, "fake-binary")
+       require.NoError(t, os.WriteFile(path, contents, 0o755))
+       return path
+}
+
+func TestAppendAndRead_RoundTrip(t *testing.T) {
+       binary := []byte("\x7FELFnot-really-an-elf-but-good-enough")
+       source := []byte("package main\n\nfunc main() {}\n")
+       metadata := []byte("airflow_bundle_metadata_version: \"1.0\"\nsdk:\n  
language: go\n")
+
+       path := writeTempBinary(t, binary)
+       require.NoError(t, Append(path, source, metadata))
+
+       got := mustRead(t, path)
+       assert.Equal(t, len(binary)+len(source)+len(metadata)+TrailerSize, 
got.size)
+
+       gotSource, gotMetadata, err := Read(path)
+       require.NoError(t, err)
+       assert.Equal(t, source, gotSource)
+       assert.Equal(t, metadata, gotMetadata)
+
+       ok, err := IsBundle(path)
+       require.NoError(t, err)
+       assert.True(t, ok)
+}
+
+func TestAppend_ZeroLengthSource(t *testing.T) {
+       binary := []byte("\x7FELFstub")
+       metadata := []byte("manifest")
+
+       path := writeTempBinary(t, binary)
+       require.NoError(t, Append(path, nil, metadata))
+
+       source, gotMetadata, err := Read(path)
+       require.NoError(t, err)
+       assert.Empty(t, source)
+       assert.Equal(t, metadata, gotMetadata)
+}
+
+func TestAppend_DeterministicOutput(t *testing.T) {
+       binary := []byte("\x7FELFstub-binary-bytes")
+       source := []byte("source")
+       metadata := []byte("manifest")
+
+       pathA := writeTempBinary(t, binary)
+       pathB := writeTempBinary(t, binary)
+       require.NoError(t, Append(pathA, source, metadata))
+       require.NoError(t, Append(pathB, source, metadata))
+
+       a, err := os.ReadFile(pathA)
+       require.NoError(t, err)
+       b, err := os.ReadFile(pathB)
+       require.NoError(t, err)
+       assert.Equal(t, a, b)
+}
+
+func TestRead_NotBundle(t *testing.T) {
+       path := writeTempBinary(t, []byte("just a regular file with no footer"))
+
+       _, _, err := Read(path)
+       require.ErrorIs(t, err, ErrNotBundle)
+
+       ok, err := IsBundle(path)
+       require.NoError(t, err)
+       assert.False(t, ok)
+}
+
+func TestRead_TooShort(t *testing.T) {
+       path := writeTempBinary(t, []byte("hi"))
+
+       _, _, err := Read(path)
+       require.ErrorIs(t, err, ErrNotBundle)
+}
+
+func TestRead_UnknownVersion(t *testing.T) {
+       binary := []byte("\x7FELFstub")
+       source := []byte("src")
+       metadata := []byte("md")
+       path := writeTempBinary(t, binary)
+       require.NoError(t, Append(path, source, metadata))
+
+       // Mutate the version byte in the trailer.
+       f, err := os.OpenFile(path, os.O_RDWR, 0)
+       require.NoError(t, err)
+       stat, err := f.Stat()
+       require.NoError(t, err)
+       // footer_ver lives at bytes 8..11 of the trailer.
+       versionOffset := stat.Size() - TrailerSize + 8
+       _, err = f.WriteAt([]byte{99, 0, 0, 0}, versionOffset)
+       require.NoError(t, err)
+       require.NoError(t, f.Close())
+
+       _, _, err = Read(path)
+       require.Error(t, err)
+       assert.True(t, errors.Is(err, ErrUnknownVersion))
+}
+
+func TestRead_HashMismatch(t *testing.T) {
+       binary := []byte("\x7FELFstub-binary-bytes")
+       source := []byte("src")
+       metadata := []byte("md")
+       path := writeTempBinary(t, binary)
+       require.NoError(t, Append(path, source, metadata))
+
+       // Corrupt a byte inside the binary region; the trailer's binary_sha256
+       // no longer matches what Read recomputes over [0, source_start).
+       f, err := os.OpenFile(path, os.O_RDWR, 0)
+       require.NoError(t, err)
+       _, err = f.WriteAt([]byte{'X'}, 0)
+       require.NoError(t, err)
+       require.NoError(t, f.Close())
+
+       _, _, err = Read(path)
+       require.ErrorIs(t, err, ErrHashMismatch)
+}
+
+type bundleStat struct {
+       size int
+}
+
+func mustRead(t *testing.T, path string) bundleStat {
+       t.Helper()
+       stat, err := os.Stat(path)
+       require.NoError(t, err)
+       return bundleStat{size: int(stat.Size())}
+}
diff --git a/go-sdk/pkg/execution/messages.go b/go-sdk/pkg/execution/messages.go
index 9f9e14505cd..b41a2e9adf6 100644
--- a/go-sdk/pkg/execution/messages.go
+++ b/go-sdk/pkg/execution/messages.go
@@ -22,6 +22,13 @@ import (
        "time"
 )
 
+// SupervisorSchemaVersion is the dated AIP-72 supervisor wire-schema version
+// this SDK's coordinator protocol is compiled against, in YYYY-MM-DD form. It
+// is reported in a bundle's airflow-metadata manifest as
+// sdk.supervisor_schema_version so the supervisor can downgrade outbound
+// messages / upgrade inbound messages to a shape the bundle understands.
+const SupervisorSchemaVersion = "2026-06-16"
+
 // Inbound messages (Supervisor -> Runtime).
 
 // TaskInstanceInfo holds task instance details from StartupDetails.
diff --git a/go-sdk/pkg/execution/metadata.go b/go-sdk/pkg/execution/metadata.go
new file mode 100644
index 00000000000..8c95708b8c9
--- /dev/null
+++ b/go-sdk/pkg/execution/metadata.go
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "encoding/json"
+       "fmt"
+       "os"
+       "runtime/debug"
+
+       "gopkg.in/yaml.v3"
+
+       "github.com/apache/airflow/go-sdk/bundle/bundlev1"
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+)
+
+// sdkModulePath is the import path of the SDK module. Used to identify the
+// SDK version from the bundle binary's build info dependencies.
+const sdkModulePath = "github.com/apache/airflow/go-sdk"
+
+// MetadataFormat selects the encoding DumpAirflowMetadata writes to stdout for
+// the bundle binary's --airflow-metadata flag.
+type MetadataFormat string
+
+const (
+       // MetadataFormatYAML is the default; it matches the 
airflow-metadata.yaml a
+       // bundle embeds, so `mybundle --airflow-metadata > 
airflow-metadata.yaml`
+       // yields a ready-to-use file.
+       MetadataFormatYAML MetadataFormat = "yaml"
+       // MetadataFormatJSON is opt-in via --format json.
+       MetadataFormatJSON MetadataFormat = "json"
+)
+
+// ParseMetadataFormat validates a --format value and returns the matching
+// MetadataFormat. An empty value defaults to YAML.
+func ParseMetadataFormat(s string) (MetadataFormat, error) {
+       switch MetadataFormat(s) {
+       case "", MetadataFormatYAML:
+               return MetadataFormatYAML, nil
+       case MetadataFormatJSON:
+               return MetadataFormatJSON, nil
+       default:
+               return "", fmt.Errorf(
+                       "unsupported --airflow-metadata format %q: want %q or 
%q",
+                       s, MetadataFormatYAML, MetadataFormatJSON,
+               )
+       }
+}
+
+// DumpAirflowMetadata writes the bundle's airflow-metadata manifest to stdout
+// (YAML by default, JSON when format is MetadataFormatJSON). It runs
+// RegisterDags against an in-memory recorder only — no gRPC server, no 
external
+// services. airflow-go-pack execs the binary with --airflow-metadata and
+// decodes this output to build the embedded manifest.
+func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format 
MetadataFormat) error {
+       meta, err := collectManifest(bundle)
+       if err != nil {
+               return err
+       }
+       data, err := encodeManifest(meta, format)
+       if err != nil {
+               return err
+       }
+       _, err = os.Stdout.Write(data)
+       return err
+}
+
+// collectManifest builds the manifest by running RegisterDags against an
+// in-memory recorder and enumerating the recorded dags and tasks.
+func collectManifest(bundle bundlev1.BundleProvider) 
(airflowmetadata.Manifest, error) {
+       reg := bundlev1.New()
+       if err := bundle.RegisterDags(reg); err != nil {
+               return airflowmetadata.Manifest{}, fmt.Errorf("registering 
dags: %w", err)
+       }
+
+       enum, ok := reg.(bundlev1.EnumerableBundle)
+       if !ok {
+               return airflowmetadata.Manifest{}, fmt.Errorf(
+                       "registry does not implement EnumerableBundle",
+               )
+       }
+
+       meta := airflowmetadata.Manifest{
+               AirflowBundleMetadataVersion: airflowmetadata.FormatVersion,
+               SDK: airflowmetadata.SDK{
+                       Language:                "go",
+                       Version:                 sdkVersion(),
+                       SupervisorSchemaVersion: SupervisorSchemaVersion,
+               },
+               Dags: make(map[string]airflowmetadata.Dag),
+       }
+       for _, dag := range enum.OrderedDags() {
+               taskIDs := make([]string, 0, len(dag.Tasks))
+               for _, t := range dag.Tasks {
+                       taskIDs = append(taskIDs, t.ID)
+               }
+               meta.Dags[dag.DagID] = airflowmetadata.Dag{Tasks: taskIDs}
+       }
+       return meta, nil
+}
+
+// encodeManifest renders the manifest, ensuring exactly one trailing newline
+// (yaml.Marshal adds one; JSON does not).
+func encodeManifest(meta airflowmetadata.Manifest, format MetadataFormat) 
([]byte, error) {
+       switch format {
+       case MetadataFormatYAML, "":
+               return yaml.Marshal(meta)
+       case MetadataFormatJSON:
+               data, err := json.MarshalIndent(meta, "", "    ")
+               if err != nil {
+                       return nil, err
+               }
+               return append(data, '\n'), nil
+       default:
+               return nil, fmt.Errorf("unsupported airflow-metadata format 
%q", format)
+       }
+}
+
+// sdkVersion returns the version of the SDK module linked into this binary,
+// derived from runtime/debug.ReadBuildInfo. Falls back to "(devel)" when
+// build info is unavailable (e.g. tests, bundle binaries built from a local
+// replace directive).
+func sdkVersion() string {
+       info, ok := debug.ReadBuildInfo()
+       if !ok {
+               return "(devel)"
+       }
+       if info.Main.Path == sdkModulePath && info.Main.Version != "" {
+               return info.Main.Version
+       }
+       for _, dep := range info.Deps {
+               if dep.Path == sdkModulePath {
+                       if dep.Replace != nil && dep.Replace.Version != "" {
+                               return dep.Replace.Version
+                       }
+                       if dep.Version != "" {
+                               return dep.Version
+                       }
+               }
+       }
+       return "(devel)"
+}
diff --git a/go-sdk/pkg/execution/metadata_test.go 
b/go-sdk/pkg/execution/metadata_test.go
new file mode 100644
index 00000000000..d105b24d898
--- /dev/null
+++ b/go-sdk/pkg/execution/metadata_test.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package execution
+
+import (
+       "encoding/json"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "gopkg.in/yaml.v3"
+
+       "github.com/apache/airflow/go-sdk/internal/airflowmetadata"
+)
+
+func sampleManifest() airflowmetadata.Manifest {
+       return airflowmetadata.Manifest{
+               // "1.0" and a task named "123" must survive a YAML round-trip 
as strings.
+               AirflowBundleMetadataVersion: "1.0",
+               SDK: airflowmetadata.SDK{
+                       Language:                "go",
+                       Version:                 "(devel)",
+                       SupervisorSchemaVersion: "2026-06-16",
+               },
+               Dags: map[string]airflowmetadata.Dag{
+                       "simple_dag": {Tasks: []string{"extract", "transform", 
"load"}},
+                       "odd_dag":    {Tasks: []string{"123"}},
+               },
+       }
+}
+
+func TestParseMetadataFormat(t *testing.T) {
+       tests := []struct {
+               in      string
+               want    MetadataFormat
+               wantErr bool
+       }{
+               {in: "", want: MetadataFormatYAML},
+               {in: "yaml", want: MetadataFormatYAML},
+               {in: "json", want: MetadataFormatJSON},
+               {in: "YAML", wantErr: true},
+               {in: "yml", wantErr: true},
+               {in: "xml", wantErr: true},
+       }
+       for _, tc := range tests {
+               t.Run(tc.in, func(t *testing.T) {
+                       got, err := ParseMetadataFormat(tc.in)
+                       if tc.wantErr {
+                               require.Error(t, err)
+                               return
+                       }
+                       require.NoError(t, err)
+                       assert.Equal(t, tc.want, got)
+               })
+       }
+}
+
+func TestEncodeManifest_JSON(t *testing.T) {
+       meta := sampleManifest()
+       data, err := encodeManifest(meta, MetadataFormatJSON)
+       require.NoError(t, err)
+
+       assert.Equal(t, byte('{'), data[0], "JSON output must start with an 
object")
+       assert.Equal(t, byte('\n'), data[len(data)-1], "output must end with a 
newline")
+       assert.NotEqual(
+               t,
+               byte('\n'),
+               data[len(data)-2],
+               "output must have exactly one trailing newline",
+       )
+       assert.Contains(t, string(data), "\n    \"sdk\"", "JSON must be 
indented four spaces")
+
+       var got airflowmetadata.Manifest
+       require.NoError(t, json.Unmarshal(data, &got))
+       assert.Equal(t, meta, got, "JSON must round-trip back to the manifest")
+}
+
+func TestEncodeManifest_YAML(t *testing.T) {
+       meta := sampleManifest()
+       // The empty format is the default and must encode as YAML.
+       for _, format := range []MetadataFormat{MetadataFormatYAML, ""} {
+               data, err := encodeManifest(meta, format)
+               require.NoError(t, err)
+
+               assert.NotEqual(t, byte('{'), data[0], "YAML output must not be 
JSON")
+               assert.Equal(t, byte('\n'), data[len(data)-1], "output must end 
with a newline")
+               assert.NotEqual(
+                       t,
+                       byte('\n'),
+                       data[len(data)-2],
+                       "output must have exactly one trailing newline",
+               )
+               assert.Contains(t, string(data), 
"airflow_bundle_metadata_version:")
+
+               var got airflowmetadata.Manifest
+               require.NoError(t, yaml.Unmarshal(data, &got))
+               assert.Equal(t, meta, got, "YAML must round-trip back to the 
manifest")
+       }
+}
+
+func TestEncodeManifest_UnsupportedFormat(t *testing.T) {
+       _, err := encodeManifest(sampleManifest(), MetadataFormat("xml"))
+       require.Error(t, err)
+}

Reply via email to