kaxil commented on code in PR #67161:
URL: https://github.com/apache/airflow/pull/67161#discussion_r3321422778


##########
task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py:
##########
@@ -0,0 +1,314 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Native executable coordinator that launches a binary subprocess for task 
execution."""
+
+from __future__ import annotations
+
+import functools
+import hashlib
+import os
+import pathlib
+import struct
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import structlog
+import yaml
+
+from airflow.sdk.coordinators.socket.coordinator import SocketCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable, Iterator, Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.executable")
+
+
+FOOTER_MAGIC = b"AFBNDL01"
+FOOTER_SIZE = 64
+FOOTER_VERSION = 1
+_HASH_READ_CHUNK = 1 << 20
+# Upper bound on the verification cache.
+_VERIFY_CACHE_MAXSIZE = 256
+
+
[email protected]
+class _Footer:
+    """
+    Parsed bundle trailer plus the byte offsets it implies.
+
+    All region offsets (``source_start``, ``metadata_start``) and the file
+    size at parse time are computed once in :meth:`read` so downstream
+    consumers do not re-derive them.
+    """
+
+    path: pathlib.Path
+    file_size: int
+    source_len: int
+    metadata_len: int
+    footer_ver: int
+    binary_sha256: bytes
+    source_start: int
+    metadata_start: int
+
+    @classmethod
+    def read(cls, path: pathlib.Path) -> Self | None:
+        """
+        Parse the trailer of *path* and return the resulting footer.
+
+        Returns ``None`` only when *path* is provably not a bundle (file is
+        smaller than the trailer, or the magic does not match).
+        """
+        size = path.stat().st_size
+        if size < FOOTER_SIZE:
+            return None
+
+        with open(path, "rb") as f:
+            f.seek(size - FOOTER_SIZE)
+            trailer = f.read(FOOTER_SIZE)
+
+        if len(trailer) != FOOTER_SIZE or trailer[56:64] != FOOTER_MAGIC:
+            return None
+
+        source_len, metadata_len, footer_ver = struct.unpack_from("<III", 
trailer, 0)
+        if footer_ver != FOOTER_VERSION:
+            raise ValueError(
+                f"Unsupported bundle footer_ver={footer_ver} in {path}; "
+                f"this runtime supports footer_ver={FOOTER_VERSION}."
+            )
+
+        binary_sha256 = bytes(trailer[12:44])
+        reserved = trailer[44:56]
+        if reserved != b"\x00" * 12:
+            raise ValueError(f"Bundle trailer in {path} has non-zero reserved 
bytes.")
+
+        metadata_start = size - FOOTER_SIZE - metadata_len
+        source_start = metadata_start - source_len
+        if source_start < 0:
+            raise ValueError(f"Bundle trailer in {path} declares regions that 
extend past the start of file.")
+        # Per the spec, the binary region [0, source_start) MUST be non-empty.
+        if source_start == 0:
+            raise ValueError(f"Bundle trailer in {path} leaves no room for the 
executable region.")
+
+        return cls(
+            path=path,
+            file_size=size,
+            source_len=source_len,
+            metadata_len=metadata_len,
+            footer_ver=footer_ver,
+            binary_sha256=binary_sha256,
+            source_start=source_start,
+            metadata_start=metadata_start,
+        )
+
+
+def _hash_binary_region(path: pathlib.Path, source_start: int) -> bytes:
+    """Compute SHA-256 over bytes ``[0, source_start)`` of *path*."""
+    digest = hashlib.sha256()
+    remaining = source_start
+    with open(path, "rb") as f:
+        while remaining > 0:
+            chunk = f.read(min(_HASH_READ_CHUNK, remaining))
+            if not chunk:
+                raise ValueError(
+                    f"Bundle {path} truncated while hashing binary region "
+                    f"(expected {source_start} bytes, got {source_start - 
remaining})."
+                )
+            digest.update(chunk)
+            remaining -= len(chunk)
+    return digest.digest()
+
+
+# LRU-bounded cache of computed binary-region digests keyed by
+# (path, inode, mtime_ns, size). A cache hit means the file at *path* still
+# has the same identity as when we last hashed it, so re-hashing on every
+# exec is unnecessary. A miss (file replaced, mtime bumped, inode swapped
+# under us) yields a different key and forces re-verification;
[email protected]_cache(maxsize=_VERIFY_CACHE_MAXSIZE)
+def _cached_binary_region_digest(
+    path_str: str,
+    source_start: int,
+    st_ino: int,
+    st_mtime_ns: int,
+    st_size: int,
+) -> bytes:
+    # st_ino / st_mtime_ns / st_size participate in the cache key only; if any
+    # of them change, the LRU treats it as a different entry and re-hashes.
+    del st_ino, st_mtime_ns, st_size
+    return _hash_binary_region(pathlib.Path(path_str), source_start)
+
+
+def _verify_binary_sha256(footer: _Footer) -> bool:
+    """Verify *footer.binary_sha256* against the binary region of 
``footer.path``."""
+    try:
+        st = footer.path.stat()
+    except OSError:
+        return False
+
+    try:
+        actual = _cached_binary_region_digest(
+            str(footer.path), footer.source_start, st.st_ino, st.st_mtime_ns, 
st.st_size
+        )
+    except (OSError, ValueError) as exc:
+        log.debug("Failed to hash bundle binary region", 
path=str(footer.path), error=str(exc))
+        return False
+
+    if actual != footer.binary_sha256:
+        log.debug(
+            "Bundle binary_sha256 mismatch; skipping",
+            path=str(footer.path),
+            expected=footer.binary_sha256.hex(),
+            actual=actual.hex(),
+        )
+        return False
+    return True
+
+
+def _read_bundle_metadata(path: pathlib.Path) -> dict[str, Any] | None:
+    try:
+        if (footer := _Footer.read(path)) is None:
+            return None
+    except (OSError, ValueError) as exc:
+        log.debug("Invalid bundle trailer; skipping", path=str(path), 
error=str(exc))
+        return None
+
+    if not _verify_binary_sha256(footer):
+        return None
+
+    with open(path, "rb") as f:
+        f.seek(footer.metadata_start)
+        metadata_bytes = f.read(footer.metadata_len)
+
+    try:
+        data = yaml.safe_load(metadata_bytes.decode("utf-8"))

Review Comment:
   A bundle with a valid trailer and matching hash but malformed YAML (or 
non-dict) metadata returns `None` here with no log, unlike the trailer-error 
path on line 192 which logs at debug. The bundle silently vanishes from the 
scan with no diagnostic. A debug log on the decode/parse failure would match 
the rest of the function.



##########
task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py:
##########
@@ -0,0 +1,314 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Native executable coordinator that launches a binary subprocess for task 
execution."""
+
+from __future__ import annotations
+
+import functools
+import hashlib
+import os
+import pathlib
+import struct
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import structlog
+import yaml
+
+from airflow.sdk.coordinators.socket.coordinator import SocketCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable, Iterator, Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.executable")
+
+
+FOOTER_MAGIC = b"AFBNDL01"
+FOOTER_SIZE = 64
+FOOTER_VERSION = 1
+_HASH_READ_CHUNK = 1 << 20
+# Upper bound on the verification cache.
+_VERIFY_CACHE_MAXSIZE = 256
+
+
[email protected]
+class _Footer:
+    """
+    Parsed bundle trailer plus the byte offsets it implies.
+
+    All region offsets (``source_start``, ``metadata_start``) and the file
+    size at parse time are computed once in :meth:`read` so downstream
+    consumers do not re-derive them.
+    """
+
+    path: pathlib.Path
+    file_size: int
+    source_len: int
+    metadata_len: int
+    footer_ver: int
+    binary_sha256: bytes
+    source_start: int
+    metadata_start: int
+
+    @classmethod
+    def read(cls, path: pathlib.Path) -> Self | None:
+        """
+        Parse the trailer of *path* and return the resulting footer.
+
+        Returns ``None`` only when *path* is provably not a bundle (file is
+        smaller than the trailer, or the magic does not match).
+        """
+        size = path.stat().st_size
+        if size < FOOTER_SIZE:
+            return None
+
+        with open(path, "rb") as f:
+            f.seek(size - FOOTER_SIZE)
+            trailer = f.read(FOOTER_SIZE)
+
+        if len(trailer) != FOOTER_SIZE or trailer[56:64] != FOOTER_MAGIC:
+            return None
+
+        source_len, metadata_len, footer_ver = struct.unpack_from("<III", 
trailer, 0)
+        if footer_ver != FOOTER_VERSION:
+            raise ValueError(
+                f"Unsupported bundle footer_ver={footer_ver} in {path}; "
+                f"this runtime supports footer_ver={FOOTER_VERSION}."
+            )
+
+        binary_sha256 = bytes(trailer[12:44])
+        reserved = trailer[44:56]
+        if reserved != b"\x00" * 12:
+            raise ValueError(f"Bundle trailer in {path} has non-zero reserved 
bytes.")
+
+        metadata_start = size - FOOTER_SIZE - metadata_len
+        source_start = metadata_start - source_len
+        if source_start < 0:
+            raise ValueError(f"Bundle trailer in {path} declares regions that 
extend past the start of file.")
+        # Per the spec, the binary region [0, source_start) MUST be non-empty.
+        if source_start == 0:
+            raise ValueError(f"Bundle trailer in {path} leaves no room for the 
executable region.")
+
+        return cls(
+            path=path,
+            file_size=size,
+            source_len=source_len,
+            metadata_len=metadata_len,
+            footer_ver=footer_ver,
+            binary_sha256=binary_sha256,
+            source_start=source_start,
+            metadata_start=metadata_start,
+        )
+
+
+def _hash_binary_region(path: pathlib.Path, source_start: int) -> bytes:
+    """Compute SHA-256 over bytes ``[0, source_start)`` of *path*."""
+    digest = hashlib.sha256()
+    remaining = source_start
+    with open(path, "rb") as f:
+        while remaining > 0:
+            chunk = f.read(min(_HASH_READ_CHUNK, remaining))
+            if not chunk:
+                raise ValueError(
+                    f"Bundle {path} truncated while hashing binary region "
+                    f"(expected {source_start} bytes, got {source_start - 
remaining})."
+                )
+            digest.update(chunk)
+            remaining -= len(chunk)
+    return digest.digest()
+
+
+# LRU-bounded cache of computed binary-region digests keyed by
+# (path, inode, mtime_ns, size). A cache hit means the file at *path* still
+# has the same identity as when we last hashed it, so re-hashing on every
+# exec is unnecessary. A miss (file replaced, mtime bumped, inode swapped
+# under us) yields a different key and forces re-verification;
[email protected]_cache(maxsize=_VERIFY_CACHE_MAXSIZE)
+def _cached_binary_region_digest(
+    path_str: str,
+    source_start: int,
+    st_ino: int,
+    st_mtime_ns: int,
+    st_size: int,
+) -> bytes:
+    # st_ino / st_mtime_ns / st_size participate in the cache key only; if any
+    # of them change, the LRU treats it as a different entry and re-hashes.
+    del st_ino, st_mtime_ns, st_size
+    return _hash_binary_region(pathlib.Path(path_str), source_start)
+
+
+def _verify_binary_sha256(footer: _Footer) -> bool:
+    """Verify *footer.binary_sha256* against the binary region of 
``footer.path``."""
+    try:
+        st = footer.path.stat()
+    except OSError:
+        return False
+
+    try:
+        actual = _cached_binary_region_digest(
+            str(footer.path), footer.source_start, st.st_ino, st.st_mtime_ns, 
st.st_size
+        )
+    except (OSError, ValueError) as exc:
+        log.debug("Failed to hash bundle binary region", 
path=str(footer.path), error=str(exc))
+        return False
+
+    if actual != footer.binary_sha256:
+        log.debug(
+            "Bundle binary_sha256 mismatch; skipping",
+            path=str(footer.path),
+            expected=footer.binary_sha256.hex(),
+            actual=actual.hex(),
+        )
+        return False
+    return True
+
+
+def _read_bundle_metadata(path: pathlib.Path) -> dict[str, Any] | None:
+    try:
+        if (footer := _Footer.read(path)) is None:
+            return None
+    except (OSError, ValueError) as exc:
+        log.debug("Invalid bundle trailer; skipping", path=str(path), 
error=str(exc))
+        return None
+
+    if not _verify_binary_sha256(footer):
+        return None
+
+    with open(path, "rb") as f:
+        f.seek(footer.metadata_start)
+        metadata_bytes = f.read(footer.metadata_len)
+
+    try:
+        data = yaml.safe_load(metadata_bytes.decode("utf-8"))
+    except (UnicodeDecodeError, yaml.YAMLError):
+        return None
+
+    if not isinstance(data, dict):
+        return None
+
+    return data
+
+
+def _dag_ids(metadata: dict[str, Any]) -> set[str]:
+    dags = metadata.get("dags")
+    if not isinstance(dags, dict):
+        return set()
+
+    return set(dags.keys())
+
+
+def _supervisor_schema_version(metadata: dict[str, Any]) -> str | None:
+    sdk = metadata.get("sdk")
+    if not isinstance(sdk, dict):
+        return None
+
+    value = sdk.get("supervisor_schema_version")
+    if not isinstance(value, str) or not value:
+        return None
+
+    return value
+
+
+def _find_executables(items: Iterable[pathlib.Path]) -> Iterator[pathlib.Path]:
+    """Yield executable regular files under *items*, descending into 
directories."""
+    for item in items:
+        if item.is_dir():
+            try:
+                children = item.iterdir()
+            except (FileNotFoundError, NotADirectoryError, PermissionError):
+                continue
+            yield from _find_executables(children)
+        elif item.is_file() and os.access(item, os.X_OK):
+            yield item
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))
+
+
[email protected]
+class _Bundle:
+    path: pathlib.Path
+    schema_version: str | None = 
attrs.field(validator=_validate_schema_version)
+
+    @classmethod
+    def find(cls, executables_root: Sequence[pathlib.Path], dag_id: str) -> 
Self:
+        log.debug("Finding executable bundles recursively", 
roots=executables_root)
+        for p in _find_executables(executables_root):
+            if (metadata := _read_bundle_metadata(p)) is None:
+                continue
+            if dag_id not in _dag_ids(metadata):
+                continue
+            try:
+                return cls(path=p.resolve(), 
schema_version=_supervisor_schema_version(metadata))
+            except (TypeError, ValueError) as exc:
+                log.debug("Bundle metadata rejected by validator; skipping", 
path=str(p), error=str(exc))
+                continue
+        resolved_paths = os.pathsep.join(str(r.resolve()) for r in 
executables_root)
+        raise FileNotFoundError(

Review Comment:
   When the only bundle matching a `dag_id` is rejected by the schema-version 
validator, `find` logs the rejection at debug (line 266) and keeps scanning, 
then raises `FileNotFoundError("cannot find executable bundle containing 
dag_id=...")`. The bundle was found, its schema version was just unusable. 
Since the real cause is only at debug level, the operator gets a "not found" 
error and looks in the wrong place. Could the rejection reason be surfaced in 
the raised error so a known-but-rejected bundle doesn't masquerade as missing?



##########
task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py:
##########
@@ -0,0 +1,314 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Native executable coordinator that launches a binary subprocess for task 
execution."""
+
+from __future__ import annotations
+
+import functools
+import hashlib
+import os
+import pathlib
+import struct
+from typing import TYPE_CHECKING, Any
+
+import attrs
+import structlog
+import yaml
+
+from airflow.sdk.coordinators.socket.coordinator import SocketCoordinator
+from airflow.sdk.execution_time.schema import get_schema_version_migrator
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable, Iterator, Sequence
+
+    from structlog.typing import FilteringBoundLogger
+    from typing_extensions import Self
+
+    from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+log: FilteringBoundLogger = 
structlog.get_logger(logger_name="coordinators.executable")
+
+
+FOOTER_MAGIC = b"AFBNDL01"
+FOOTER_SIZE = 64
+FOOTER_VERSION = 1
+_HASH_READ_CHUNK = 1 << 20
+# Upper bound on the verification cache.
+_VERIFY_CACHE_MAXSIZE = 256
+
+
[email protected]
+class _Footer:
+    """
+    Parsed bundle trailer plus the byte offsets it implies.
+
+    All region offsets (``source_start``, ``metadata_start``) and the file
+    size at parse time are computed once in :meth:`read` so downstream
+    consumers do not re-derive them.
+    """
+
+    path: pathlib.Path
+    file_size: int
+    source_len: int
+    metadata_len: int
+    footer_ver: int
+    binary_sha256: bytes
+    source_start: int
+    metadata_start: int
+
+    @classmethod
+    def read(cls, path: pathlib.Path) -> Self | None:
+        """
+        Parse the trailer of *path* and return the resulting footer.
+
+        Returns ``None`` only when *path* is provably not a bundle (file is
+        smaller than the trailer, or the magic does not match).
+        """
+        size = path.stat().st_size
+        if size < FOOTER_SIZE:
+            return None
+
+        with open(path, "rb") as f:
+            f.seek(size - FOOTER_SIZE)
+            trailer = f.read(FOOTER_SIZE)
+
+        if len(trailer) != FOOTER_SIZE or trailer[56:64] != FOOTER_MAGIC:
+            return None
+
+        source_len, metadata_len, footer_ver = struct.unpack_from("<III", 
trailer, 0)
+        if footer_ver != FOOTER_VERSION:
+            raise ValueError(
+                f"Unsupported bundle footer_ver={footer_ver} in {path}; "
+                f"this runtime supports footer_ver={FOOTER_VERSION}."
+            )
+
+        binary_sha256 = bytes(trailer[12:44])
+        reserved = trailer[44:56]
+        if reserved != b"\x00" * 12:
+            raise ValueError(f"Bundle trailer in {path} has non-zero reserved 
bytes.")
+
+        metadata_start = size - FOOTER_SIZE - metadata_len
+        source_start = metadata_start - source_len
+        if source_start < 0:
+            raise ValueError(f"Bundle trailer in {path} declares regions that 
extend past the start of file.")
+        # Per the spec, the binary region [0, source_start) MUST be non-empty.
+        if source_start == 0:
+            raise ValueError(f"Bundle trailer in {path} leaves no room for the 
executable region.")
+
+        return cls(
+            path=path,
+            file_size=size,
+            source_len=source_len,
+            metadata_len=metadata_len,
+            footer_ver=footer_ver,
+            binary_sha256=binary_sha256,
+            source_start=source_start,
+            metadata_start=metadata_start,
+        )
+
+
+def _hash_binary_region(path: pathlib.Path, source_start: int) -> bytes:
+    """Compute SHA-256 over bytes ``[0, source_start)`` of *path*."""
+    digest = hashlib.sha256()
+    remaining = source_start
+    with open(path, "rb") as f:
+        while remaining > 0:
+            chunk = f.read(min(_HASH_READ_CHUNK, remaining))
+            if not chunk:
+                raise ValueError(
+                    f"Bundle {path} truncated while hashing binary region "
+                    f"(expected {source_start} bytes, got {source_start - 
remaining})."
+                )
+            digest.update(chunk)
+            remaining -= len(chunk)
+    return digest.digest()
+
+
+# LRU-bounded cache of computed binary-region digests keyed by
+# (path, inode, mtime_ns, size). A cache hit means the file at *path* still
+# has the same identity as when we last hashed it, so re-hashing on every
+# exec is unnecessary. A miss (file replaced, mtime bumped, inode swapped
+# under us) yields a different key and forces re-verification;
[email protected]_cache(maxsize=_VERIFY_CACHE_MAXSIZE)
+def _cached_binary_region_digest(
+    path_str: str,
+    source_start: int,
+    st_ino: int,
+    st_mtime_ns: int,
+    st_size: int,
+) -> bytes:
+    # st_ino / st_mtime_ns / st_size participate in the cache key only; if any
+    # of them change, the LRU treats it as a different entry and re-hashes.
+    del st_ino, st_mtime_ns, st_size
+    return _hash_binary_region(pathlib.Path(path_str), source_start)
+
+
+def _verify_binary_sha256(footer: _Footer) -> bool:
+    """Verify *footer.binary_sha256* against the binary region of 
``footer.path``."""
+    try:
+        st = footer.path.stat()
+    except OSError:
+        return False
+
+    try:
+        actual = _cached_binary_region_digest(
+            str(footer.path), footer.source_start, st.st_ino, st.st_mtime_ns, 
st.st_size
+        )
+    except (OSError, ValueError) as exc:
+        log.debug("Failed to hash bundle binary region", 
path=str(footer.path), error=str(exc))
+        return False
+
+    if actual != footer.binary_sha256:
+        log.debug(
+            "Bundle binary_sha256 mismatch; skipping",
+            path=str(footer.path),
+            expected=footer.binary_sha256.hex(),
+            actual=actual.hex(),
+        )
+        return False
+    return True
+
+
+def _read_bundle_metadata(path: pathlib.Path) -> dict[str, Any] | None:
+    try:
+        if (footer := _Footer.read(path)) is None:
+            return None
+    except (OSError, ValueError) as exc:
+        log.debug("Invalid bundle trailer; skipping", path=str(path), 
error=str(exc))
+        return None
+
+    if not _verify_binary_sha256(footer):
+        return None
+
+    with open(path, "rb") as f:
+        f.seek(footer.metadata_start)
+        metadata_bytes = f.read(footer.metadata_len)
+
+    try:
+        data = yaml.safe_load(metadata_bytes.decode("utf-8"))
+    except (UnicodeDecodeError, yaml.YAMLError):
+        return None
+
+    if not isinstance(data, dict):
+        return None
+
+    return data
+
+
+def _dag_ids(metadata: dict[str, Any]) -> set[str]:
+    dags = metadata.get("dags")
+    if not isinstance(dags, dict):
+        return set()
+
+    return set(dags.keys())
+
+
+def _supervisor_schema_version(metadata: dict[str, Any]) -> str | None:
+    sdk = metadata.get("sdk")
+    if not isinstance(sdk, dict):
+        return None
+
+    value = sdk.get("supervisor_schema_version")
+    if not isinstance(value, str) or not value:
+        return None
+
+    return value
+
+
+def _find_executables(items: Iterable[pathlib.Path]) -> Iterator[pathlib.Path]:
+    """Yield executable regular files under *items*, descending into 
directories."""
+    for item in items:
+        if item.is_dir():
+            try:
+                children = item.iterdir()
+            except (FileNotFoundError, NotADirectoryError, PermissionError):
+                continue
+            yield from _find_executables(children)
+        elif item.is_file() and os.access(item, os.X_OK):
+            yield item
+
+
+def _validate_schema_version(instance, _, value) -> str:
+    return get_schema_version_migrator().resolve_version(str(value))

Review Comment:
   `_validate_schema_version` is wired as an attrs `validator` but returns 
`resolve_version(...)`. attrs validators discard their return value (only 
converters transform the stored attribute), so the resolved value is dropped 
and the raw `value` is stored. It works today only because `resolve_version` is 
effectively identity, but if it ever normalizes the version string that 
normalization is silently lost. Worth making this a converter, or splitting the 
validate and normalize steps.



##########
task-sdk/docs/executable-bundle-spec.rst:
##########
@@ -0,0 +1,299 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Executable Bundle Spec
+======================
+
+This document specifies the on-disk format of a build artifact produced by an
+Airflow native-executable SDK (Go, Rust, C++, Zig, ...) and consumed by
+:class:`~airflow.sdk.coordinators.executable.ExecutableCoordinator`
+at deployment time.
+
+The goal is a single, language-agnostic *bundle* shape so that scheduler,
+worker, and UI behave identically regardless of which compiled SDK produced
+the DAG.
+
+Bundle-spec version: ``1.0``.
+
+Container
+---------
+
+A bundle is **the compiled executable itself, with a fixed-format footer
+appended after the binary's normal end-of-file**. The executable remains
+directly runnable; the footer is data that follows the last byte the OS
+loader cares about and is invisible to ``exec()``. There is no enclosing
+archive.
+
+A bundle file therefore has three regions, in order from offset 0:
+
+1. The native executable (ELF / Mach-O / PE), including any code-signing
+   structures the platform appends.
+2. The primary DAG source file, embedded verbatim (UTF-8). MAY have length 0.
+3. The build-time manifest (``airflow-metadata.yaml`` content, UTF-8).
+
+The file ends with a fixed 64-byte trailer that locates regions (2) and (3),
+carries an integrity hash of the binary region, and identifies the file as a
+bundle. See :ref:`bundle-trailer-layout`.
+
+Filenames follow OS conventions for executables: no extension on Linux/macOS,
+``.exe`` on Windows. The scanner identifies bundles by the trailer's magic,
+not by the filename.
+
+.. _bundle-trailer-layout:
+
+Trailer Layout
+--------------
+
+The last 64 bytes of a conforming bundle are the trailer. All multi-byte
+integers are little-endian.
+
+::
+
+    bytes  0..3    source_len     uint32     length of the source region in 
bytes
+    bytes  4..7    metadata_len   uint32     length of the metadata region in 
bytes
+    bytes  8..11   footer_ver     uint32     currently 1
+    bytes 12..43   binary_sha256  32 bytes   SHA-256 of the binary region
+    bytes 44..55   reserved       12 bytes   MUST be zero
+    bytes 56..63   magic          8 bytes    ASCII "AFBNDL01"
+
+The magic is the byte sequence ``0x41 0x46 0x42 0x4E 0x44 0x4C 0x30 0x31``
+(``"AFBNDL01"``). The trailing ``01`` is the footer-format version repeated
+in ASCII so a human can identify a bundle at a glance
+(``tail -c 8 ./mybundle | xxd``); the binary ``footer_ver`` field is the
+authoritative source of truth for parsing.
+
+``binary_sha256`` is the SHA-256 digest computed over the **binary region
+only** — bytes ``[0, source_start)``. The hash field sits inside the trailer
+and therefore cannot cover the bytes it occupies; it provides *integrity*
+(the binary region has not been truncated, corrupted, or naively edited
+between packing and exec) rather than *authenticity*
+(see :ref:`bundle-code-signing` for how authenticity layers on top).
+
+Reader algorithm:
+
+1. Open the file. Seek to ``EOF - 64``. Read 64 bytes.
+2. Compare bytes ``56..63`` against ``"AFBNDL01"``. If different, the file
+   is not a bundle; the scanner MUST ignore it.
+3. Parse ``footer_ver``. If unknown, fail with a versioning error.
+4. Compute ``metadata_start = filesize - 64 - metadata_len`` and
+   ``source_start = metadata_start - source_len``.
+5. Validate ``source_start >= 0`` and that the implied binary region
+   (``[0, source_start)``) is non-empty.
+6. Compute SHA-256 over the binary region ``[0, source_start)`` and compare
+   to ``binary_sha256``. Mismatch is a hard failure handled identically to
+   a magic-check failure: the scanner logs and skips the file. The result
+   MAY be cached by ``(path, inode, mtime, size)`` so the runtime does not
+   re-hash on every exec; a cache miss (file replaced, mtime bumped)
+   triggers re-verification.
+7. Read ``metadata_len`` bytes from ``metadata_start`` for the manifest.
+8. Read ``source_len`` bytes from ``source_start`` for the source view.
+   If ``source_len == 0``, no source is embedded; the UI displays
+   "(source not available)".
+
+Source comes *before* metadata so a future ``footer_ver`` MAY introduce
+additional trailing blobs (e.g. signed checksums, compressed deps) by
+extending the trailer rather than inserting between existing blobs.
+
+.. _bundle-metadata-schema:
+
+``airflow-metadata.yaml`` schema
+--------------------------------
+
+The metadata region carries the same YAML manifest documented previously,
+produced at build time from a static scan of the DAG source. A
+machine-readable JSON Schema is published at
+:download:`airflow-metadata.schema.json` for use by build tooling, validators,
+and editors.
+
+.. code-block:: yaml
+
+    airflow_bundle_metadata_version: "1.0"
+    sdk:
+      language: go
+      version: "0.1.0"
+      supervisor_schema_version: "2026-06-16"
+    source: example.go
+    dags:
+      example_dag:
+        tasks:
+          - extract
+          - transform
+          - load
+      another_dag:
+        tasks:
+          - run
+
+Top-level keys:
+
+``airflow_bundle_metadata_version`` (string, required)
+    The bundle-spec version this manifest conforms to. Currently ``"1.0"``.
+
+``sdk`` (mapping, required)
+    Identifies the SDK that produced the bundle.
+
+    - ``language`` (string, required): lower-case source-language identifier
+      (e.g. ``go``, ``rust``, ``cpp``, ``zig``).
+    - ``version`` (string, required): SDK version used at build time.
+    - ``supervisor_schema_version`` (string, required): dated AIP-72
+      supervisor wire-schema version the bundle was compiled against, in
+      ``YYYY-MM-DD`` format (e.g. ``"2026-06-16"``). The coordinator passes
+      this value to the supervisor so it can downgrade outbound messages /
+      upgrade inbound messages to a shape the bundle understands. The value
+      MUST resolve against the supervisor's schema bundle; an unknown
+      version is rejected at coordinator start.

Review Comment:
   This says an unknown `supervisor_schema_version` is "rejected at coordinator 
start", but validation actually happens lazily in `_Bundle.find` at 
task-execution time (per task), not at coordinator construction. The 
constructor only stores `executables_root` and does no scanning. Either the 
spec should say "at task-execution time" or the coordinator should validate 
eagerly on start.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to