kaxil commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3321537294


##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -457,6 +457,12 @@ def ti_update_state(
                 extra=json.dumps({"host_name": hostname}) if hostname else 
None,
             )
         )
+        # Commit the TI state update now to release the task_instance row lock 
before
+        # running asset-event queries. Asset registration can hold the lock 
for seconds
+        # under high concurrency (many aliases with large event histories), 
causing
+        # idle-in-transaction pile-up that exhausts API server memory and 
triggers OOMKill.
+        # The task outcome is durable from this point on.
+        session.commit()

Review Comment:
   Following up here rather than opening a new thread. Swallowing makes sense 
for task-state durability, a 500 would make the worker retry a task that 
already succeeded. But I don't think that covers the event itself: once 204 
goes back the supervisor treats the task as done and never retries, and nothing 
reconciles a dropped event, so a failed `register_asset_changes_in_db` means an 
asset-scheduled downstream DAG silently never fires. Could that failure be 
surfaced (a metric, or a queued retry) instead of only `log.exception`? And 
since the early `session.commit()` is what opens this window, has the 
contention it fixes been measured, i.e. that the `AssetDagRunQueue` inserts 
really need the lock released first?



##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1368,6 +1368,166 @@ def test_ti_update_state_running_errors(self, client, 
session, create_task_insta
 
         assert response.status_code == 422
 
+    def test_ti_update_state_to_success_asset_registration_failure_returns_204(
+        self, client, session, create_task_instance
+    ):
+        """Regression: asset registration failure after TI state commit must 
return 204, not 500.
+
+        The TI state is committed (and the row lock released) before asset 
registration runs.
+        If registration fails at that point, the task outcome is already 
durable as SUCCESS,
+        so surfacing HTTP 500 would be misleading and cause unnecessary worker 
retries.
+        """
+        asset = AssetModel(
+            id=42,
+            name="fail-asset",
+            uri="s3://bucket/fail-asset",
+            group="asset",
+            extra={},
+        )
+        asset_active = AssetActive.for_asset(asset)
+        session.add_all([asset, asset_active])
+
+        ti = create_task_instance(
+            task_id="test_asset_reg_failure",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        with mock.patch(
+            
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+            side_effect=Exception("simulated DB explosion during asset 
registration"),
+        ):
+            response = client.patch(
+                f"/execution/task-instances/{ti.id}/state",
+                json={
+                    "state": "success",
+                    "end_date": DEFAULT_END_DATE.isoformat(),
+                    "task_outlets": [
+                        {"name": "fail-asset", "uri": 
"s3://bucket/fail-asset", "type": "Asset"}
+                    ],
+                    "outlet_events": [],
+                },
+            )
+
+        assert response.status_code == 204, f"Expected 204, got 
{response.status_code}: {response.text}"
+        session.expire_all()
+        ti_db = session.get(TaskInstance, ti.id)
+        assert ti_db is not None
+        assert ti_db.state == TaskInstanceState.SUCCESS
+
+    def test_ti_update_state_rolls_back_partial_asset_registration_on_failure(
+        self, client, session, create_task_instance
+    ):
+        asset = AssetModel(
+            id=43,
+            name="partial-asset",
+            uri="s3://bucket/partial-asset",
+            group="asset",
+            extra={},
+        )
+        session.add_all([asset, AssetActive.for_asset(asset)])
+
+        ti = create_task_instance(
+            task_id="test_partial_asset_registration_failure",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        def add_event_then_fail(ti, task_outlets, outlet_events, session):
+            session.add(
+                AssetEvent(
+                    asset_id=asset.id,
+                    extra={"partial": True},
+                    source_task_id=ti.task_id,
+                    source_dag_id=ti.dag_id,
+                    source_run_id=ti.run_id,
+                    source_map_index=ti.map_index,
+                )
+            )
+            session.flush()
+            raise RuntimeError("simulated failure after partial asset 
registration")
+
+        with mock.patch(
+            
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+            side_effect=add_event_then_fail,
+        ):
+            response = client.patch(
+                f"/execution/task-instances/{ti.id}/state",
+                json={
+                    "state": "success",
+                    "end_date": DEFAULT_END_DATE.isoformat(),
+                    "task_outlets": [
+                        {"name": "partial-asset", "uri": 
"s3://bucket/partial-asset", "type": "Asset"}
+                    ],
+                    "outlet_events": [],
+                },
+            )
+
+        assert response.status_code == 204, f"Expected 204, got 
{response.status_code}: {response.text}"
+        session.expire_all()
+        ti_db = session.get(TaskInstance, ti.id)
+        assert ti_db is not None
+        assert ti_db.state == TaskInstanceState.SUCCESS
+        assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id == 
asset.id)).all() == []
+
+    def test_ti_update_state_swallow_asset_registration_commit_failure(
+        self, client, session, create_task_instance
+    ):
+        asset = AssetModel(
+            id=44,
+            name="commit-fail-asset",
+            uri="s3://bucket/commit-fail-asset",
+            group="asset",
+            extra={},
+        )
+        session.add_all([asset, AssetActive.for_asset(asset)])
+
+        ti = create_task_instance(
+            task_id="test_asset_registration_commit_failure",
+            start_date=DEFAULT_START_DATE,
+            state=State.RUNNING,
+        )
+        session.commit()
+
+        real_commit = Session.commit
+        commit_count = 0
+        failed_asset_commit = False
+
+        def fail_second_commit(session):
+            nonlocal commit_count, failed_asset_commit
+            commit_count += 1
+            if commit_count == 2:

Review Comment:
   `commit_count == 2` assumes asset registration is always the second commit, 
but the success path has a conditional commit between them: with 
`clear_on_success` enabled the clear block (`task_instances.py:501`) commits 
too, making asset registration the third commit. With that config on this test 
would inject the failure into the clear-on-success commit and still assert 204, 
passing while exercising the wrong path. Matching on something other than 
commit ordinal (e.g. patching `register_asset_changes_in_db` to commit then 
raise, like the test just above) would hold up better.



##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -327,8 +328,16 @@ def register_asset_change(
             ).unique()
 
             for asset_alias_model in asset_alias_models:
-                asset_alias_model.asset_events.append(asset_event)
-                session.add(asset_alias_model)
+                # Use a direct INSERT rather than ORM .append() to avoid 
lazy-loading the
+                # entire asset_events collection. On long-running deployments 
that collection
+                # can contain thousands of rows; loading it on the 
task-success hot path can
+                # leave DB connections idle-in-transaction for minutes, 
blocking other workers.
+                session.execute(

Review Comment:
   The direct `insert(...)` skips the ORM, so the in-memory 
`asset_alias_model.asset_events` collection stays stale, the association row is 
in the DB but not in the session's identity map. Nothing reads that collection 
again before commit today so it's safe, but it's a quiet trap if future code in 
the same request reads `asset_events` after this point. Worth a one-line note 
that the relationship is intentionally left unsynced here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to