hkc-8010 commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3381209152


##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -154,6 +155,88 @@ def test_register_asset_change_with_alias(
         )
         assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
 
+    def test_register_asset_change_with_alias_no_lazy_load(
+        self, session, mock_task_instance, testing_dag_bundle
+    ):
+        """Regression: alias-event association must use a direct INSERT, not 
ORM .append().
+
+        ORM .append() lazy-loads the entire asset_events collection before 
writing.
+        On long-running deployments with thousands of past events, this query 
runs
+        while the task_instance row lock is held in ti_update_state, causing 
idle-in-transaction
+        pile-up that exhausts API server memory and triggers OOMKill.
+        """
+        asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", 
group="asset")
+        session.add(asm)
+        asam = AssetAliasModel(name="test_nolazy_alias", group="test")
+        session.add(asam)
+        session.flush()
+
+        # Pre-populate existing alias-event rows to simulate a long-running 
deployment.
+        # If .append() is used, SQLAlchemy will lazy-load ALL of these before 
inserting the new one.
+        existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in 
range(5)]
+        session.add_all(existing_events)
+        session.flush()
+        for ev in existing_events:
+            session.execute(
+                
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, 
event_id=ev.id)
+            )
+        session.flush()
+
+        # Expire the alias so a lazy-load would have to hit the DB (no 
in-memory cache).
+        session.expire(asam)
+
+        asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
+        asset_manager = AssetManager()
+
+        lazy_load_selects: list[str] = []
+        real_execute = session.execute
+
+        def tracking_execute(stmt, *args, **kwargs):
+            try:
+                compiled = str(stmt.compile(compile_kwargs={"literal_binds": 
True}))
+            except Exception:
+                compiled = str(stmt)
+            # Detect a lazy-load SELECT joining asset_alias_asset_event with 
asset_event
+            if (
+                "asset_alias_asset_event" in compiled.lower()
+                and "asset_event" in compiled.lower()
+                and compiled.strip().upper().startswith("SELECT")
+            ):
+                lazy_load_selects.append(compiled[:120])
+            return real_execute(stmt, *args, **kwargs)
+
+        with mock.patch.object(session, "execute", 
side_effect=tracking_execute):

Review Comment:
   Addressed in c3e3e7c. I replaced the `Session.execute` mock with a 
`before_cursor_execute` listener on the session bind so the test captures the 
actual SQL emitted by SQLAlchemy, including relationship-loader SELECTs. I also 
red-checked it locally by temporarily swapping the production path back to 
`asset_alias_model.asset_events.append(asset_event)` and rerunning the targeted 
test; it failed with the lazy-load SELECT captured, then passed again after 
restoring the direct association-table insert.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -490,14 +499,40 @@ def ti_update_state(
                     task_id=task_id,
                     map_index=map_index,
                 )
+                session.commit()
             except Exception:
+                session.rollback()
                 log.warning(
                     "Failed to clear task state on success",
                     dag_id=dag_id,
                     run_id=run_id,
                     task_id=task_id,
                 )
 
+    # Asset registration runs outside the TI row lock. Failures are logged and 
counted;
+    # raising HTTP 500 here would be misleading because the task already 
succeeded and
+    # would make the worker retry a state update that has already completed. 
Durable
+    # retry/reconciliation for dropped asset events is out of scope for this 
hot-path fix.
+    if isinstance(ti_patch_payload, TISuccessStatePayload) and 
ti_patch_payload.task_outlets:

Review Comment:
   Addressed in c3e3e7c. The post-commit asset registration guard now also 
requires `updated_state == TaskInstanceState.SUCCESS`, so the registration path 
only runs for the committed successful state. I added a regression that forces 
`_create_ti_state_update_query_and_update_state` to raise, verifies the handler 
still returns 204 after committing the TI as `FAILED`, and asserts asset 
registration is skipped: no `register_asset_changes_in_db` call, no 
`AssetEvent`, and no `asset.registration_failures` metric because registration 
never ran.



##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -154,6 +155,88 @@ def test_register_asset_change_with_alias(
         )
         assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
 
+    def test_register_asset_change_with_alias_no_lazy_load(
+        self, session, mock_task_instance, testing_dag_bundle
+    ):
+        """Regression: alias-event association must use a direct INSERT, not 
ORM .append().
+
+        ORM .append() lazy-loads the entire asset_events collection before 
writing.
+        On long-running deployments with thousands of past events, this query 
runs
+        while the task_instance row lock is held in ti_update_state, causing 
idle-in-transaction
+        pile-up that exhausts API server memory and triggers OOMKill.
+        """
+        asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset", 
group="asset")
+        session.add(asm)
+        asam = AssetAliasModel(name="test_nolazy_alias", group="test")
+        session.add(asam)
+        session.flush()
+
+        # Pre-populate existing alias-event rows to simulate a long-running 
deployment.
+        # If .append() is used, SQLAlchemy will lazy-load ALL of these before 
inserting the new one.
+        existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in 
range(5)]
+        session.add_all(existing_events)
+        session.flush()
+        for ev in existing_events:
+            session.execute(
+                
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id, 
event_id=ev.id)
+            )
+        session.flush()
+
+        # Expire the alias so a lazy-load would have to hit the DB (no 
in-memory cache).
+        session.expire(asam)
+
+        asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
+        asset_manager = AssetManager()
+
+        lazy_load_selects: list[str] = []
+        real_execute = session.execute
+
+        def tracking_execute(stmt, *args, **kwargs):
+            try:
+                compiled = str(stmt.compile(compile_kwargs={"literal_binds": 
True}))
+            except Exception:
+                compiled = str(stmt)
+            # Detect a lazy-load SELECT joining asset_alias_asset_event with 
asset_event
+            if (
+                "asset_alias_asset_event" in compiled.lower()
+                and "asset_event" in compiled.lower()
+                and compiled.strip().upper().startswith("SELECT")
+            ):
+                lazy_load_selects.append(compiled[:120])
+            return real_execute(stmt, *args, **kwargs)
+
+        with mock.patch.object(session, "execute", 
side_effect=tracking_execute):

Review Comment:
   Addressed in c3e3e7c. I replaced the `Session.execute` mock with a 
`before_cursor_execute` listener on the session bind so the test captures the 
actual SQL emitted by SQLAlchemy, including relationship-loader SELECTs. I also 
red-checked it locally by temporarily swapping the production path back to 
`asset_alias_model.asset_events.append(asset_event)` and rerunning the targeted 
test; it failed with the lazy-load SELECT captured, then passed again after 
restoring the direct association-table insert.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -490,14 +499,40 @@ def ti_update_state(
                     task_id=task_id,
                     map_index=map_index,
                 )
+                session.commit()
             except Exception:
+                session.rollback()
                 log.warning(
                     "Failed to clear task state on success",
                     dag_id=dag_id,
                     run_id=run_id,
                     task_id=task_id,
                 )
 
+    # Asset registration runs outside the TI row lock. Failures are logged and 
counted;
+    # raising HTTP 500 here would be misleading because the task already 
succeeded and
+    # would make the worker retry a state update that has already completed. 
Durable
+    # retry/reconciliation for dropped asset events is out of scope for this 
hot-path fix.
+    if isinstance(ti_patch_payload, TISuccessStatePayload) and 
ti_patch_payload.task_outlets:

Review Comment:
   Addressed in c3e3e7c. The post-commit asset registration guard now also 
requires `updated_state == TaskInstanceState.SUCCESS`, so the registration path 
only runs for the committed successful state. I added a regression that forces 
`_create_ti_state_update_query_and_update_state` to raise, verifies the handler 
still returns 204 after committing the TI as `FAILED`, and asserts asset 
registration is skipped: no `register_asset_changes_in_db` call, no 
`AssetEvent`, and no `asset.registration_failures` metric because registration 
never ran.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to