kaxil commented on code in PR #67235:
URL: https://github.com/apache/airflow/pull/67235#discussion_r3277765789
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -730,35 +733,51 @@ def _create_log_forwarder(self, loggers, name,
log_level=logging.INFO) -> Callab
def _on_socket_closed(self, sock: socket):
# We want to keep servicing this process until we've read up to EOF
from all the sockets.
-
with suppress(KeyError):
self.selector.unregister(sock)
del self._open_sockets[sock]
+ def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts)
-> dict[str, Any]:
+ if self._subprocess_schema_version is not None:
+ migrator = get_schema_version_migrator()
+ msg = migrator.downgrade(msg, self._subprocess_schema_version,
dump_kwargs=dump_opts)
Review Comment:
This call crashes the first time the migration path actually runs.
`dump_kwargs=dump_opts` is absorbed by `downgrade`'s `**dump_opts`, then
forwarded to `msg.model_dump(**{**dump_opts, "mode": "json"})` at
`migrator.py:156` -- which becomes `model_dump(dump_kwargs={...},
mode="json")`. Pydantic rejects `dump_kwargs` as an unexpected keyword and
raises `TypeError`.
CI is green only because the integration-test fixture monkeypatches
`_serialize_response` with a corrected implementation (see
`test_integration.py` `mock_version_migrator` fixture), so this production code
path is never exercised under test.
Fix:
```python
msg = migrator.downgrade(msg, self._subprocess_schema_version, **dump_opts)
```
Then add a real test (no monkeypatch on `_serialize_response`) that drives
`send_msg` end-to-end against a non-`None` `_subprocess_schema_version`.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -548,6 +549,8 @@ class WatchedSubprocess:
_process: psutil.Process = attrs.field(repr=False)
"""File descriptor for request handling."""
+ _subprocess_schema_version: str | None = None
Review Comment:
This attribute has no production writer, so the entire migration feature is
dead code on `main`. There is no constructor arg (`attrs.define` ignores plain
class attributes without `attrs.field`, so it's not even an attrs field), no
setter or property assignment anywhere in `supervisor.py`, and no handshake
message that negotiates the runtime's schema version.
Result: in production the `if self._subprocess_schema_version is not None:`
gates at lines 740 and 759 are always False, and the schema-migration seam
never runs. The only callers that flip it are the integration tests, which
assign `ws._subprocess_schema_version = pinned_version` directly on the
instance.
Either wire the version in through the child handshake before merging, or be
explicit in the PR description that this is the data-structures-only half and
the negotiation lands in a follow-up. Right now the PR description claims "The
supervisor use the migrations to upgrade ... and downgrade messages" which
overstates what's actually hooked up.
##########
task-sdk/.pre-commit-config.yaml:
##########
@@ -43,6 +43,8 @@ repos:
^src/airflow/sdk/definitions/deadline\.py$|
^src/airflow/sdk/definitions/dag\.py$|
^src/airflow/sdk/definitions/_internal/types\.py$|
+ ^src/airflow/sdk/execution_time/coordinator\.py$|
Review Comment:
`coordinator.py` does not exist in this PR -- searching the diff and the
task-sdk tree shows no `src/airflow/sdk/execution_time/coordinator.py`.
Pre-emptive exclusions for unmerged files attract drift (the file lands later
with a different path, the exclusion silently turns into a no-op). Drop this
line until the file actually lands; add it back in the PR that introduces
`coordinator.py`.
##########
task-sdk/src/airflow/sdk/execution_time/schema/migrator.py:
##########
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+In-process bidirectional migration for supervisor schema bodies.
+
+:class:`SchemaVersionMigrator` walks a :class:`~cadwyn.VersionBundle`
+itself rather than going through Cadwyn's HTTP runner so the supervisor
+can downgrade outgoing bodies and upgrade incoming bodies without a
+network round-trip. The downgrade path additionally re-validates against
+the cadwyn-generated versioned class so declarative
+``schema(X).field(Y).didnt_exist`` instructions actually drop fields on
+the wire.
+"""
+
+from __future__ import annotations
+
+import functools
+from typing import TYPE_CHECKING, Any, cast
+
+import attrs
+from cadwyn import generate_versioned_models
+
+if TYPE_CHECKING:
+ from cadwyn import VersionBundle
+ from cadwyn.schema_generation import SchemaGenerator
+ from pydantic import BaseModel
+
+
+class _BodyInfo:
+ """
+ Duck-type stand-in for Cadwyn's ``RequestInfo`` / ``ResponseInfo``.
+
+ ``cadwyn.structure.data._AlterDataInstruction.__call__`` only reads
+ and writes ``info.body``; the by-schema transformers we drive never
+ touch FastAPI's Request/Response. Passing this minimal object lets
+ us run cadwyn's migrations from a pure in-process code path with no
+ HTTP stack.
+ """
+
+ __slots__ = ("body",)
+
+ def __init__(self, body: dict[str, Any]) -> None:
+ # Copy so the caller's mapping survives intact when the
+ # instruction chain mutates ``info.body`` in place.
+ self.body = dict(body)
+
+
+def _validate_supervisor_version(instance: SchemaVersionMigrator, _, value:
str) -> str:
+ return instance._resolve_version(value)
+
+
+def _calculate_version_values(migrator: SchemaVersionMigrator) ->
frozenset[str]:
+ return frozenset(v.value for v in migrator._bundle.versions)
+
+
[email protected](kw_only=True)
+class SchemaVersionMigrator:
+ """
+ Bidirectional in-process migrator for supervisor schema bodies.
+
+ Each foreign runtime is pinned to a specific dated lang-SDK supervisor
+ schema version; this class walks Cadwyn's ``VersionChange`` chain
in-process
+ to bridge the two::
+
+ head shape --- downgrade(msg, lang_sdk) ---> lang-SDK wire
+ head shape <-- upgrade(msg, lang_sdk) --- lang-SDK wire
+
+ *supervisor_version* is fixed at construction time.
+
+ note::
+ Use ``bundle.versions[0].value`` to get the latest dated entry. Cadwyn
+ keeps versions in newest-to-oldest order.
+
+ A message whose Pydantic type is not mentioned by any ``schema(...)``
+ instruction in the bundle is passed through as-is: Cadwyn keys its
+ instruction dicts by message type, so the lookup misses and no
+ transformer runs.
+ """
+
+ _bundle: VersionBundle
+ _supervisor_version: str =
attrs.field(validator=_validate_supervisor_version)
+
+ # Caches over the bundle (which is immutable for the migrator's lifetime).
+ # ``generate_versioned_models`` walks the full version graph;
+ # ``_version_values`` mirrors cadwyn's internal lookup set without reaching
+ # into its private attribute.
+ _versioned_models: dict[str, SchemaGenerator] = attrs.field(init=False,
default=None)
+ _version_values: frozenset[str] = attrs.field(
+ init=False,
+ default=attrs.Factory(_calculate_version_values, takes_self=True),
+ )
+
+ def _versioned_class(self, version: str, model: type[BaseModel]) ->
type[BaseModel]:
+ """Get the Cadwyn-generated class for *model* at *version*."""
+ if self._versioned_models is None:
+ self._versioned_models = generate_versioned_models(self._bundle)
+ return self._versioned_models[version][model]
+
+ def _resolve_version(self, v: str) -> str:
+ """Validate *v* is present in the bundle."""
+ if v not in self._version_values:
+ raise ValueError(f"Version {v!r} not found in supervisor schema
bundle")
+ return v
+
+ def downgrade(
+ self,
+ msg: BaseModel,
+ trarget_schema_version: str,
Review Comment:
Typo on a public parameter name: `trarget_schema_version` ->
`target_schema_version`. This is reachable from outside the module
(`supervisor._serialize_response` calls `migrator.downgrade(msg, version)`
positionally today, but anything that calls it by keyword later will inherit
the misspelling) so worth fixing before it hardens into the API surface. The
`upgrade` counterpart at line 179 already uses the correct spelling
(`source_schema_version`), so this is just inconsistency.
##########
task-sdk/src/airflow/sdk/execution_time/schema/migrator.py:
##########
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+In-process bidirectional migration for supervisor schema bodies.
+
+:class:`SchemaVersionMigrator` walks a :class:`~cadwyn.VersionBundle`
+itself rather than going through Cadwyn's HTTP runner so the supervisor
+can downgrade outgoing bodies and upgrade incoming bodies without a
+network round-trip. The downgrade path additionally re-validates against
+the cadwyn-generated versioned class so declarative
+``schema(X).field(Y).didnt_exist`` instructions actually drop fields on
+the wire.
+"""
+
+from __future__ import annotations
+
+import functools
+from typing import TYPE_CHECKING, Any, cast
+
+import attrs
+from cadwyn import generate_versioned_models
+
+if TYPE_CHECKING:
+ from cadwyn import VersionBundle
+ from cadwyn.schema_generation import SchemaGenerator
+ from pydantic import BaseModel
+
+
+class _BodyInfo:
+ """
+ Duck-type stand-in for Cadwyn's ``RequestInfo`` / ``ResponseInfo``.
+
+ ``cadwyn.structure.data._AlterDataInstruction.__call__`` only reads
+ and writes ``info.body``; the by-schema transformers we drive never
+ touch FastAPI's Request/Response. Passing this minimal object lets
+ us run cadwyn's migrations from a pure in-process code path with no
+ HTTP stack.
+ """
+
+ __slots__ = ("body",)
+
+ def __init__(self, body: dict[str, Any]) -> None:
+ # Copy so the caller's mapping survives intact when the
+ # instruction chain mutates ``info.body`` in place.
+ self.body = dict(body)
Review Comment:
Nit: comment overpromises. `dict(body)` is a shallow copy -- if a caller
passes `{"items": [1, 2, 3]}` and an instruction does
`info.body["items"].append(4)`, the caller's list is mutated. The current
production callers serialize through `model_dump(mode="json")` first so all
nested values are primitives, but the comment as written reads like a general
isolation guarantee.
Either tighten the comment to mention the shallow-copy caveat, or
`copy.deepcopy(body)` if you actually want isolation.
##########
task-sdk/src/airflow/sdk/execution_time/schema/migrator.py:
##########
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+In-process bidirectional migration for supervisor schema bodies.
+
+:class:`SchemaVersionMigrator` walks a :class:`~cadwyn.VersionBundle`
+itself rather than going through Cadwyn's HTTP runner so the supervisor
+can downgrade outgoing bodies and upgrade incoming bodies without a
+network round-trip. The downgrade path additionally re-validates against
+the cadwyn-generated versioned class so declarative
+``schema(X).field(Y).didnt_exist`` instructions actually drop fields on
+the wire.
+"""
+
+from __future__ import annotations
+
+import functools
+from typing import TYPE_CHECKING, Any, cast
+
+import attrs
+from cadwyn import generate_versioned_models
+
+if TYPE_CHECKING:
+ from cadwyn import VersionBundle
+ from cadwyn.schema_generation import SchemaGenerator
+ from pydantic import BaseModel
+
+
+class _BodyInfo:
+ """
+ Duck-type stand-in for Cadwyn's ``RequestInfo`` / ``ResponseInfo``.
+
+ ``cadwyn.structure.data._AlterDataInstruction.__call__`` only reads
+ and writes ``info.body``; the by-schema transformers we drive never
+ touch FastAPI's Request/Response. Passing this minimal object lets
+ us run cadwyn's migrations from a pure in-process code path with no
+ HTTP stack.
+ """
+
+ __slots__ = ("body",)
+
+ def __init__(self, body: dict[str, Any]) -> None:
+ # Copy so the caller's mapping survives intact when the
+ # instruction chain mutates ``info.body`` in place.
+ self.body = dict(body)
+
+
+def _validate_supervisor_version(instance: SchemaVersionMigrator, _, value:
str) -> str:
+ return instance._resolve_version(value)
+
+
+def _calculate_version_values(migrator: SchemaVersionMigrator) ->
frozenset[str]:
+ return frozenset(v.value for v in migrator._bundle.versions)
+
+
[email protected](kw_only=True)
+class SchemaVersionMigrator:
+ """
+ Bidirectional in-process migrator for supervisor schema bodies.
+
+ Each foreign runtime is pinned to a specific dated lang-SDK supervisor
+ schema version; this class walks Cadwyn's ``VersionChange`` chain
in-process
+ to bridge the two::
+
+ head shape --- downgrade(msg, lang_sdk) ---> lang-SDK wire
+ head shape <-- upgrade(msg, lang_sdk) --- lang-SDK wire
+
+ *supervisor_version* is fixed at construction time.
+
+ note::
+ Use ``bundle.versions[0].value`` to get the latest dated entry. Cadwyn
+ keeps versions in newest-to-oldest order.
+
+ A message whose Pydantic type is not mentioned by any ``schema(...)``
+ instruction in the bundle is passed through as-is: Cadwyn keys its
+ instruction dicts by message type, so the lookup misses and no
+ transformer runs.
+ """
+
+ _bundle: VersionBundle
+ _supervisor_version: str =
attrs.field(validator=_validate_supervisor_version)
+
+ # Caches over the bundle (which is immutable for the migrator's lifetime).
+ # ``generate_versioned_models`` walks the full version graph;
+ # ``_version_values`` mirrors cadwyn's internal lookup set without reaching
+ # into its private attribute.
+ _versioned_models: dict[str, SchemaGenerator] = attrs.field(init=False,
default=None)
+ _version_values: frozenset[str] = attrs.field(
+ init=False,
+ default=attrs.Factory(_calculate_version_values, takes_self=True),
+ )
+
+ def _versioned_class(self, version: str, model: type[BaseModel]) ->
type[BaseModel]:
+ """Get the Cadwyn-generated class for *model* at *version*."""
+ if self._versioned_models is None:
+ self._versioned_models = generate_versioned_models(self._bundle)
+ return self._versioned_models[version][model]
+
+ def _resolve_version(self, v: str) -> str:
+ """Validate *v* is present in the bundle."""
+ if v not in self._version_values:
+ raise ValueError(f"Version {v!r} not found in supervisor schema
bundle")
+ return v
+
+ def downgrade(
+ self,
+ msg: BaseModel,
+ trarget_schema_version: str,
+ **dump_opts: dict[str, Any],
Review Comment:
`**dump_opts: dict[str, Any]` annotates each individual kwarg value as
`dict[str, Any]`, which is wrong -- callers pass arbitrary `model_dump` options
like `exclude_none=True` or `by_alias=True`, none of which are dicts. The
`cast("dict[str, Any]", {**dump_opts, "mode": "json"})` at line 156 quietly
papers over the mismatch.
Use `**dump_opts: Any` (each value is `Any`, the collected `dump_opts` is
then `dict[str, Any]` automatically), and you can drop the `cast` at line 156
as well.
##########
uv.lock:
##########
@@ -4214,7 +4214,6 @@ google = [
]
langchain = [
Review Comment:
Unrelated change: this PR removes `langchain-openai` from the langchain
extra (also in the deleted package entry at line 14587). That's almost
certainly a rebase artefact from a merge against `main` -- nothing in this PR
touches langchain or its extras.
Regenerate `uv.lock` against fresh `main` before merging so the lockfile
diff matches the code diff. Otherwise the next person bisecting a langchain
regression lands here and has to figure out why a supervisor-schema PR removed
an LLM dependency.
##########
task-sdk/src/airflow/sdk/execution_time/schema/migrator.py:
##########
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+In-process bidirectional migration for supervisor schema bodies.
+
+:class:`SchemaVersionMigrator` walks a :class:`~cadwyn.VersionBundle`
+itself rather than going through Cadwyn's HTTP runner so the supervisor
+can downgrade outgoing bodies and upgrade incoming bodies without a
+network round-trip. The downgrade path additionally re-validates against
+the cadwyn-generated versioned class so declarative
+``schema(X).field(Y).didnt_exist`` instructions actually drop fields on
+the wire.
+"""
+
+from __future__ import annotations
+
+import functools
+from typing import TYPE_CHECKING, Any, cast
+
+import attrs
+from cadwyn import generate_versioned_models
+
+if TYPE_CHECKING:
+ from cadwyn import VersionBundle
+ from cadwyn.schema_generation import SchemaGenerator
+ from pydantic import BaseModel
+
+
+class _BodyInfo:
+ """
+ Duck-type stand-in for Cadwyn's ``RequestInfo`` / ``ResponseInfo``.
+
+ ``cadwyn.structure.data._AlterDataInstruction.__call__`` only reads
+ and writes ``info.body``; the by-schema transformers we drive never
+ touch FastAPI's Request/Response. Passing this minimal object lets
+ us run cadwyn's migrations from a pure in-process code path with no
+ HTTP stack.
+ """
+
+ __slots__ = ("body",)
+
+ def __init__(self, body: dict[str, Any]) -> None:
+ # Copy so the caller's mapping survives intact when the
+ # instruction chain mutates ``info.body`` in place.
+ self.body = dict(body)
+
+
+def _validate_supervisor_version(instance: SchemaVersionMigrator, _, value:
str) -> str:
+ return instance._resolve_version(value)
+
+
+def _calculate_version_values(migrator: SchemaVersionMigrator) ->
frozenset[str]:
+ return frozenset(v.value for v in migrator._bundle.versions)
+
+
[email protected](kw_only=True)
+class SchemaVersionMigrator:
+ """
+ Bidirectional in-process migrator for supervisor schema bodies.
+
+ Each foreign runtime is pinned to a specific dated lang-SDK supervisor
+ schema version; this class walks Cadwyn's ``VersionChange`` chain
in-process
+ to bridge the two::
+
+ head shape --- downgrade(msg, lang_sdk) ---> lang-SDK wire
+ head shape <-- upgrade(msg, lang_sdk) --- lang-SDK wire
+
+ *supervisor_version* is fixed at construction time.
+
+ note::
+ Use ``bundle.versions[0].value`` to get the latest dated entry. Cadwyn
+ keeps versions in newest-to-oldest order.
+
+ A message whose Pydantic type is not mentioned by any ``schema(...)``
+ instruction in the bundle is passed through as-is: Cadwyn keys its
+ instruction dicts by message type, so the lookup misses and no
+ transformer runs.
+ """
+
+ _bundle: VersionBundle
+ _supervisor_version: str =
attrs.field(validator=_validate_supervisor_version)
+
+ # Caches over the bundle (which is immutable for the migrator's lifetime).
+ # ``generate_versioned_models`` walks the full version graph;
+ # ``_version_values`` mirrors cadwyn's internal lookup set without reaching
+ # into its private attribute.
+ _versioned_models: dict[str, SchemaGenerator] = attrs.field(init=False,
default=None)
+ _version_values: frozenset[str] = attrs.field(
+ init=False,
+ default=attrs.Factory(_calculate_version_values, takes_self=True),
+ )
+
+ def _versioned_class(self, version: str, model: type[BaseModel]) ->
type[BaseModel]:
+ """Get the Cadwyn-generated class for *model* at *version*."""
+ if self._versioned_models is None:
+ self._versioned_models = generate_versioned_models(self._bundle)
+ return self._versioned_models[version][model]
+
+ def _resolve_version(self, v: str) -> str:
+ """Validate *v* is present in the bundle."""
+ if v not in self._version_values:
+ raise ValueError(f"Version {v!r} not found in supervisor schema
bundle")
+ return v
+
+ def downgrade(
+ self,
+ msg: BaseModel,
+ trarget_schema_version: str,
+ **dump_opts: dict[str, Any],
+ ) -> BaseModel:
+ """
+ Downgrade *msg* from server to *trarget_schema_version*.
+
+ Used on the supervisor -> foreign-runtime path: *msg* is a head-shape
+ Pydantic instance, and the returned dict matches the target.
+
+ :param msg: A Pydantic instance shaped according to the head
+ (latest) version of the bundle.
+ :param trarget_schema_version: Dated supervisor schema version string
in
+ ``YYYY-MM-DD`` format. Must be an exact value in the bundle.
+ :param dump_opts: Forwarded to ``model_dump`` when dumping *msg* for
+ migration. The mode is already set to ``json`` so
datetime/UUID/Path
+ etc. serialize to primitives the versioned-model validators inside
+ the chain accept.
+ :returns: A plain dict shaped for *trarget_schema_version*.
+ """
+ model = type(msg)
+ trarget_schema_version = self._resolve_version(trarget_schema_version)
+ info = _BodyInfo(msg.model_dump(**cast("dict[str, Any]", {**dump_opts,
"mode": "json"})))
+ for version in self._bundle.versions:
+ if version.value > self._supervisor_version:
+ continue
+ if version.value <= trarget_schema_version:
+ break
+ for change in version.changes:
+ for instr in
change.alter_response_by_schema_instructions.get(model, ()):
+ # TODO: Cadwyn is tightly coupled to Startlette request and
+ # response objects. Our supervisor does not use an HTTP
+ # framework, so we need to mock out the object. Fix this
+ # when Cadwyn provides a framework-agnostic interface.
+ instr(info) # type: ignore[arg-type]
+ # Re-validate against the versioned class so
schema(X).field(Y).didnt_exist
+ # instructions take effect: those alter the class shape, not the dict,
so
+ # without this round-trip the dropped field would still appear on the
wire.
+ versioned_class = self._versioned_class(trarget_schema_version, model)
+ return versioned_class.model_validate(info.body)
+
+ def upgrade(
+ self,
+ body: dict[str, Any],
+ model: type[BaseModel],
+ source_schema_version: str,
+ ) -> dict[str, Any]:
+ """
+ Upgrade *body* from *trarget_schema_version* to the supervisor's shape.
Review Comment:
Stale `trarget_schema_version` reference -- the parameter on this method is
`source_schema_version`. Either the typo migrated from the `downgrade`
docstring during a copy-paste, or the rename was incomplete. Fix to
`source_schema_version` to match the actual signature.
##########
task-sdk/src/airflow/sdk/execution_time/schema/migrator.py:
##########
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+In-process bidirectional migration for supervisor schema bodies.
+
+:class:`SchemaVersionMigrator` walks a :class:`~cadwyn.VersionBundle`
+itself rather than going through Cadwyn's HTTP runner so the supervisor
+can downgrade outgoing bodies and upgrade incoming bodies without a
+network round-trip. The downgrade path additionally re-validates against
+the cadwyn-generated versioned class so declarative
+``schema(X).field(Y).didnt_exist`` instructions actually drop fields on
+the wire.
+"""
+
+from __future__ import annotations
+
+import functools
+from typing import TYPE_CHECKING, Any, cast
+
+import attrs
+from cadwyn import generate_versioned_models
+
+if TYPE_CHECKING:
+ from cadwyn import VersionBundle
+ from cadwyn.schema_generation import SchemaGenerator
+ from pydantic import BaseModel
+
+
+class _BodyInfo:
+ """
+ Duck-type stand-in for Cadwyn's ``RequestInfo`` / ``ResponseInfo``.
+
+ ``cadwyn.structure.data._AlterDataInstruction.__call__`` only reads
+ and writes ``info.body``; the by-schema transformers we drive never
+ touch FastAPI's Request/Response. Passing this minimal object lets
+ us run cadwyn's migrations from a pure in-process code path with no
+ HTTP stack.
+ """
+
+ __slots__ = ("body",)
+
+ def __init__(self, body: dict[str, Any]) -> None:
+ # Copy so the caller's mapping survives intact when the
+ # instruction chain mutates ``info.body`` in place.
+ self.body = dict(body)
+
+
+def _validate_supervisor_version(instance: SchemaVersionMigrator, _, value:
str) -> str:
+ return instance._resolve_version(value)
+
+
+def _calculate_version_values(migrator: SchemaVersionMigrator) ->
frozenset[str]:
+ return frozenset(v.value for v in migrator._bundle.versions)
+
+
[email protected](kw_only=True)
+class SchemaVersionMigrator:
+ """
+ Bidirectional in-process migrator for supervisor schema bodies.
+
+ Each foreign runtime is pinned to a specific dated lang-SDK supervisor
+ schema version; this class walks Cadwyn's ``VersionChange`` chain
in-process
+ to bridge the two::
+
+ head shape --- downgrade(msg, lang_sdk) ---> lang-SDK wire
+ head shape <-- upgrade(msg, lang_sdk) --- lang-SDK wire
+
+ *supervisor_version* is fixed at construction time.
+
+ note::
+ Use ``bundle.versions[0].value`` to get the latest dated entry. Cadwyn
+ keeps versions in newest-to-oldest order.
+
+ A message whose Pydantic type is not mentioned by any ``schema(...)``
+ instruction in the bundle is passed through as-is: Cadwyn keys its
+ instruction dicts by message type, so the lookup misses and no
+ transformer runs.
+ """
+
+ _bundle: VersionBundle
+ _supervisor_version: str =
attrs.field(validator=_validate_supervisor_version)
+
+ # Caches over the bundle (which is immutable for the migrator's lifetime).
+ # ``generate_versioned_models`` walks the full version graph;
+ # ``_version_values`` mirrors cadwyn's internal lookup set without reaching
+ # into its private attribute.
+ _versioned_models: dict[str, SchemaGenerator] = attrs.field(init=False,
default=None)
+ _version_values: frozenset[str] = attrs.field(
+ init=False,
+ default=attrs.Factory(_calculate_version_values, takes_self=True),
+ )
+
+ def _versioned_class(self, version: str, model: type[BaseModel]) ->
type[BaseModel]:
+ """Get the Cadwyn-generated class for *model* at *version*."""
+ if self._versioned_models is None:
+ self._versioned_models = generate_versioned_models(self._bundle)
+ return self._versioned_models[version][model]
+
+ def _resolve_version(self, v: str) -> str:
+ """Validate *v* is present in the bundle."""
+ if v not in self._version_values:
+ raise ValueError(f"Version {v!r} not found in supervisor schema
bundle")
+ return v
+
+ def downgrade(
+ self,
+ msg: BaseModel,
+ trarget_schema_version: str,
+ **dump_opts: dict[str, Any],
+ ) -> BaseModel:
+ """
+ Downgrade *msg* from server to *trarget_schema_version*.
+
+ Used on the supervisor -> foreign-runtime path: *msg* is a head-shape
+ Pydantic instance, and the returned dict matches the target.
+
+ :param msg: A Pydantic instance shaped according to the head
+ (latest) version of the bundle.
+ :param trarget_schema_version: Dated supervisor schema version string
in
+ ``YYYY-MM-DD`` format. Must be an exact value in the bundle.
+ :param dump_opts: Forwarded to ``model_dump`` when dumping *msg* for
+ migration. The mode is already set to ``json`` so
datetime/UUID/Path
+ etc. serialize to primitives the versioned-model validators inside
+ the chain accept.
+ :returns: A plain dict shaped for *trarget_schema_version*.
Review Comment:
Docstring is wrong and is the root cause of the `dump_kwargs` bug in
`supervisor.py:743`.
The signature returns `BaseModel` (line 137) and the implementation returns
`versioned_class.model_validate(info.body)` -- a Pydantic instance, not a dict.
The `:returns:` line says "A plain dict shaped for *trarget_schema_version*."
which is what the caller in `_serialize_response` was written against, hence
`migrator.downgrade(msg, ..., dump_kwargs=dump_opts)` trying to pass model_dump
options into a function that no longer dumps.
Fix the docstring to match the return type, and also align with `upgrade` at
line 175 which deliberately *does* return a dict. The asymmetry (downgrade
returns model, upgrade returns dict) is itself a smell -- worth a comment
explaining why, or making them consistent.
##########
task-sdk/tests/task_sdk/execution_time/schema/test_integration.py:
##########
@@ -0,0 +1,366 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+In-process integration tests for the supervisor schema migration seam.
+
+Drive ``WatchedSubprocess.send_msg`` and ``WatchedSubprocess.handle_requests``
+directly against a ``MagicMock`` socket, then decode the bytes the production
+code wrote and assert on the wire shape. The migrator runs for real against the
+mock Cadwyn bundle in :mod:`_mock_version_bundle`, swapped in via
+``monkeypatch`` for the duration of one test.
+"""
+
+from __future__ import annotations
+
+from typing import Any, ClassVar
+from unittest.mock import MagicMock, call
+
+import attrs
+import msgspec
+import psutil
+import pytest
+import structlog
+from pydantic import TypeAdapter
+from task_sdk.execution_time.schema._mock_version_bundle import (
+ ALL_VERSIONS,
+ MOCK_REGISTRY,
+ MOCK_VERSION_BUNDLE,
+ _LangSdkRequest,
+ _SupervisorResponse,
+)
+from uuid6 import uuid7
+
+from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame
+from airflow.sdk.execution_time.schema import SchemaVersionMigrator
+from airflow.sdk.execution_time.supervisor import WatchedSubprocess
+
+
[email protected]
+def mock_version_migrator(monkeypatch) -> SchemaVersionMigrator:
+ """
+ Bind the production migrator factory and registry to
:data:`MOCK_VERSION_BUNDLE`.
+
+ Three patches are applied:
+
+ 1. ``supervisor.get_schema_version_migrator`` -- the already-imported
binding
+ inside :mod:`airflow.sdk.execution_time.supervisor` that
+ ``_serialize_response`` and ``_deserialize_request`` call.
+ 2. ``schema.get_schema_version_migrator`` -- the canonical
+ location, kept in sync so any code that re-imports the symbol sees the
+ mock too.
+ 3. ``schema.registered_models_by_name`` -- used by
+ ``resolve_body_class`` (called from ``_deserialize_request``) to map
wire
+ discriminators to head classes. Swapped for :data:`MOCK_REGISTRY` so
the
+ upgrade path resolves ``_LangSdkRequest`` without touching the real
comms
+ registry.
+
+ A corrected ``_serialize_response`` is also installed to work around a bug
+ in the current implementation where ``dump_kwargs=dump_opts`` is passed
+ positionally to ``downgrade`` instead of being unpacked with ``**``,
+ causing ``model_dump`` to reject an unexpected keyword argument. The
patched
+ version calls ``downgrade`` without extra kwargs and lets the versioned
+ model's ``model_dump()`` produce the wire dict.
+ """
+ migrator = SchemaVersionMigrator(
+ bundle=MOCK_VERSION_BUNDLE,
+ supervisor_version=MOCK_VERSION_BUNDLE.versions[0].value,
+ )
+ mock_migrator_factory = lambda: migrator
+
+ monkeypatch.setattr(
+ "airflow.sdk.execution_time.supervisor.get_schema_version_migrator",
+ mock_migrator_factory,
+ )
+ monkeypatch.setattr(
+ "airflow.sdk.execution_time.schema.get_schema_version_migrator",
+ mock_migrator_factory,
+ )
+ monkeypatch.setattr(
+ "airflow.sdk.execution_time.schema.registered_models_by_name",
+ lambda: MOCK_REGISTRY,
+ )
+
+ # Patch _serialize_response with a corrected implementation.
+ # The current supervisor code passes ``dump_kwargs=dump_opts`` to
+ # ``downgrade`` which then tries to call ``model_dump(dump_kwargs=...,
+ # mode="json")`` -- an unexpected kwarg that Pydantic rejects. The
+ # corrected version below calls ``downgrade`` with no extra args and
+ # lets the resulting versioned model handle serialisation.
+ def _corrected_serialize_response(self, msg, **dump_opts):
Review Comment:
This monkeypatch *documents* the supervisor bug rather than testing the real
code path.
The fixture installs a corrected `_serialize_response` that calls
`m.downgrade(msg, version)` without the broken `dump_kwargs=dump_opts` kwarg
(see comment on `supervisor.py:743`). With this patch in place the integration
test suite never exercises the actual `_serialize_response` -- so the
production code can ship broken and the suite stays green.
Once the supervisor call site is fixed (`**dump_opts` instead of
`dump_kwargs=dump_opts`), delete the `monkeypatch.setattr(WatchedSubprocess,
"_serialize_response", ...)` and the `_corrected_serialize_response` definition
above. The fixture still needs the migrator + registry patches, just not the
`_serialize_response` override.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]