kaxil commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3359448918


##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -490,14 +499,40 @@ def ti_update_state(
                     task_id=task_id,
                     map_index=map_index,
                 )
+                session.commit()
             except Exception:
+                session.rollback()
                 log.warning(
                     "Failed to clear task state on success",
                     dag_id=dag_id,
                     run_id=run_id,
                     task_id=task_id,
                 )
 
+    # Asset registration runs outside the TI row lock. Failures are logged and 
counted;
+    # raising HTTP 500 here would be misleading because the task already 
succeeded and
+    # would make the worker retry a state update that has already completed. 
Durable
+    # retry/reconciliation for dropped asset events is out of scope for this 
hot-path fix.
+    if isinstance(ti_patch_payload, TISuccessStatePayload) and 
ti_patch_payload.task_outlets:

Review Comment:
   This gates on the payload type, but `updated_state` can have been flipped to 
`FAILED` by the time we reach here. If 
`_create_ti_state_update_query_and_update_state` raises, the `except` block 
commits the TI as failed (`query.values(state=(updated_state := 
TaskInstanceState.FAILED))`, line 445). The payload is still a 
`TISuccessStatePayload`, so this block runs anyway and emits asset events + 
queues downstream `AssetDagRunQueue` rows for a task that actually ended up 
failed.
   
   On `main` the registration ran inside 
`_create_ti_state_update_query_and_update_state` under that same `try`, so a 
raised exception skipped it. Moving it out here and switching the guard from 
the committed state to the payload type drops that coupling.
   
   The `state_store` clear block right above gates on `if updated_state == 
TaskInstanceState.SUCCESS:` (line 485). Suggest the same guard here so a forced 
failure can't still register assets.



##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -154,6 +155,88 @@ def test_register_asset_change_with_alias(
         )
         assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
 
+    def test_register_asset_change_with_alias_no_lazy_load(
+        self, session, mock_task_instance, testing_dag_bundle
+    ):
+        """Regression: alias-event association must use a direct INSERT, not 
ORM .append().
+
+        ORM .append() lazy-loads the entire asset_events collection before 
writing.
+        On long-running deployments with thousands of past events, this query 
runs
+        while the task_instance row lock is held in ti_update_state, causing 
idle-in-transaction
+        pile-up that exhausts API server memory and triggers OOMKill.
+        """
+        asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", 
group="asset")
+        session.add(asm)
+        asam = AssetAliasModel(name="test_nolazy_alias", group="test")
+        session.add(asam)
+        session.flush()
+
+        # Pre-populate existing alias-event rows to simulate a long-running 
deployment.
+        # If .append() is used, SQLAlchemy will lazy-load ALL of these before 
inserting the new one.
+        existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in 
range(5)]
+        session.add_all(existing_events)
+        session.flush()
+        for ev in existing_events:
+            session.execute(
+                
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, 
event_id=ev.id)
+            )
+        session.flush()
+
+        # Expire the alias so a lazy-load would have to hit the DB (no 
in-memory cache).
+        session.expire(asam)
+
+        asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
+        asset_manager = AssetManager()
+
+        lazy_load_selects: list[str] = []
+        real_execute = session.execute
+
+        def tracking_execute(stmt, *args, **kwargs):
+            try:
+                compiled = str(stmt.compile(compile_kwargs={"literal_binds": 
True}))
+            except Exception:
+                compiled = str(stmt)
+            # Detect a lazy-load SELECT joining asset_alias_asset_event with 
asset_event
+            if (
+                "asset_alias_asset_event" in compiled.lower()
+                and "asset_event" in compiled.lower()
+                and compiled.strip().upper().startswith("SELECT")
+            ):
+                lazy_load_selects.append(compiled[:120])
+            return real_execute(stmt, *args, **kwargs)
+
+        with mock.patch.object(session, "execute", 
side_effect=tracking_execute):

Review Comment:
   Did you confirm this test fails when the production code is reverted to the 
old `.append()` path? My concern is that ORM collection lazy-loads are emitted 
by the relationship loader strategy and may not route through the public 
`Session.execute` you patch here, in which case `lazy_load_selects` stays empty 
even with `.append()` and the assertion at the bottom passes either way, so it 
wouldn't actually catch the regression it's named for.
   
   The revert-and-watch-it-fail check would settle it. If lazy loads don't 
surface through this wrapper, an `event.listen(engine, "before_cursor_execute", 
...)` hook capturing the emitted SQL would be a more reliable probe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to