kaxil commented on code in PR #67217:
URL: https://github.com/apache/airflow/pull/67217#discussion_r3307763801


##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -324,19 +324,24 @@ def _extract_template_params(bundle_instance: 
BaseDagBundle) -> dict:
 
         return params
 
-    def get_bundle(self, name: str, version: str | None = None) -> 
BaseDagBundle:
+    def get_bundle(
+        self, name: str, version: str | None = None, version_data: dict | None 
= None

Review Comment:
   nit: type drift across the three layers. `workloads/base.py:69` uses 
`dict[str, Any] | None` and the `BundleVersion` dataclass at 
`bundles/base.py:309` also uses `dict[str, Any] | None`, but here it's a bare 
`dict | None`. Worth tightening to `dict[str, Any] | None` so the signatures 
agree and mypy callers don't have to widen on this hop.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -650,6 +650,7 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
                     ranked_query.c.map_index_for_ordering,
                 )
                 .options(selectinload(TI.dag_model))
+                .options(selectinload(TI.dag_version))

Review Comment:
   This selectinload matters because TIs go through `make_transient()` before 
`ExecuteTask.make(ti)` reads `ti.dag_version.version_data`. Lazy loads on 
transient objects return `None` silently rather than raising 
`DetachedInstanceError`, so if a future "trim queries" pass drops this line, 
`version_data` flows to workers as `None` with no test or log failure. Worth 
either an inline comment pinning the dependency, or a round-trip test in 
`tests/unit/jobs/test_scheduler_job.py` that asserts `version_data` survives 
`_enqueue_task_instances_with_queued_state`.
   
   Also flagging: this adds a second selectinload inside the `FOR UPDATE SKIP 
LOCKED` critical section, one extra round-trip while row locks are held. 
Probably fine for now, but worth keeping an eye on if scheduler contention 
regresses.



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -66,6 +66,7 @@ class BundleInfo(BaseModel):
 
     name: str
     version: str | None = None
+    version_data: dict[str, Any] | None = None

Review Comment:
   `version_data` is serialized end-to-end on every workload payload (executor 
channel, JWT, possibly SQS/Kafka/K8s pod env). The first bundle implementation 
that populates this with a per-DAG-file manifest could blow through SQS's 256 
KB message limit or the ~1 MB K8s pod-env cap on large multi-tenant deployments.
   
   Worth one of: a docstring with a soft size cap, a Pydantic validator that 
warns above a threshold, or a documented side-channel pattern (S3 URL pointer + 
worker fetch) before any consumer ships a manifest-style payload here.



##########
airflow-core/tests/unit/dag_processing/bundles/test_base.py:
##########
@@ -323,3 +323,16 @@ def test_bundle_version_inequality(self):
         bv1 = BundleVersion(version="abc", data={"key": "val"})
         bv2 = BundleVersion(version="abc", data={"key": "other"})
         assert bv1 != bv2
+
+
+def test_version_data_stored_on_bundle():
+    """Test that version_data passed to a bundle constructor is stored on the 
instance."""
+    manifest = {"schema_version": 1, "files": {"dags/my_dag.py": 
"S3VersionId123"}}
+    bundle = BasicBundle(name="test", version="abc", version_data=manifest)
+    assert bundle.version_data == manifest
+
+
+def test_version_data_defaults_to_none():
+    """Test that version_data defaults to None when not provided."""
+    bundle = BasicBundle(name="test")
+    assert bundle.version_data is None

Review Comment:
   These two tests verify the constructor stores `version_data`, but nothing 
exercises the actual threading from TI to `BundleInfo`: `ExecuteTask.make(ti)` 
populating `BundleInfo.version_data` from `ti.dag_version.version_data` under 
both pinned and unpinned runs.
   
   Worth a test that builds a TI with `dag_version.version_data={"x": 1}` + 
`dag_run.bundle_version="v1"`, calls `ExecuteTask.make(ti)`, and asserts 
`BundleInfo.version_data == {"x": 1}`; same setup with `bundle_version=None` 
should produce `version_data is None`. That's what pins the case-2 (pinned-run) 
behavior the guard at `workloads/task.py:122` now relies on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to