hkc-8010 commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3381209152
##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -154,6 +155,88 @@ def test_register_asset_change_with_alias(
)
assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
+ def test_register_asset_change_with_alias_no_lazy_load(
+ self, session, mock_task_instance, testing_dag_bundle
+ ):
+ """Regression: alias-event association must use a direct INSERT, not
ORM .append().
+
+ ORM .append() lazy-loads the entire asset_events collection before
writing.
+ On long-running deployments with thousands of past events, this query
runs
+ while the task_instance row lock is held in ti_update_state, causing
idle-in-transaction
+ pile-up that exhausts API server memory and triggers OOMKill.
+ """
+ asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset",
group="asset")
+ session.add(asm)
+ asam = AssetAliasModel(name="test_nolazy_alias", group="test")
+ session.add(asam)
+ session.flush()
+
+ # Pre-populate existing alias-event rows to simulate a long-running
deployment.
+ # If .append() is used, SQLAlchemy will lazy-load ALL of these before
inserting the new one.
+ existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in
range(5)]
+ session.add_all(existing_events)
+ session.flush()
+ for ev in existing_events:
+ session.execute(
+
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id,
event_id=ev.id)
+ )
+ session.flush()
+
+ # Expire the alias so a lazy-load would have to hit the DB (no
in-memory cache).
+ session.expire(asam)
+
+ asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
+ asset_manager = AssetManager()
+
+ lazy_load_selects: list[str] = []
+ real_execute = session.execute
+
+ def tracking_execute(stmt, *args, **kwargs):
+ try:
+ compiled = str(stmt.compile(compile_kwargs={"literal_binds":
True}))
+ except Exception:
+ compiled = str(stmt)
+ # Detect a lazy-load SELECT joining asset_alias_asset_event with
asset_event
+ if (
+ "asset_alias_asset_event" in compiled.lower()
+ and "asset_event" in compiled.lower()
+ and compiled.strip().upper().startswith("SELECT")
+ ):
+ lazy_load_selects.append(compiled[:120])
+ return real_execute(stmt, *args, **kwargs)
+
+ with mock.patch.object(session, "execute",
side_effect=tracking_execute):
Review Comment:
Addressed in c3e3e7c. I replaced the `Session.execute` mock with a
`before_cursor_execute` listener on the session bind so the test captures the
actual SQL emitted by SQLAlchemy, including relationship-loader SELECTs. I also
red-checked it locally by temporarily swapping the production path back to
`asset_alias_model.asset_events.append(asset_event)` and rerunning the targeted
test; it failed with the lazy-load SELECT captured, then passed again after
restoring the direct association-table insert.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -490,14 +499,40 @@ def ti_update_state(
task_id=task_id,
map_index=map_index,
)
+ session.commit()
except Exception:
+ session.rollback()
log.warning(
"Failed to clear task state on success",
dag_id=dag_id,
run_id=run_id,
task_id=task_id,
)
+ # Asset registration runs outside the TI row lock. Failures are logged and
counted;
+ # raising HTTP 500 here would be misleading because the task already
succeeded and
+ # would make the worker retry a state update that has already completed.
Durable
+ # retry/reconciliation for dropped asset events is out of scope for this
hot-path fix.
+ if isinstance(ti_patch_payload, TISuccessStatePayload) and
ti_patch_payload.task_outlets:
Review Comment:
Addressed in c3e3e7c. The post-commit asset registration guard now also
requires `updated_state == TaskInstanceState.SUCCESS`, so the registration path
only runs for the committed successful state. I added a regression that forces
`_create_ti_state_update_query_and_update_state` to raise, verifies the handler
still returns 204 after committing the TI as `FAILED`, and asserts asset
registration is skipped: no `register_asset_changes_in_db` call, no
`AssetEvent`, and no `asset.registration_failures` metric because registration
never ran.
##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -154,6 +155,88 @@ def test_register_asset_change_with_alias(
)
assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
+ def test_register_asset_change_with_alias_no_lazy_load(
+ self, session, mock_task_instance, testing_dag_bundle
+ ):
+ """Regression: alias-event association must use a direct INSERT, not
ORM .append().
+
+ ORM .append() lazy-loads the entire asset_events collection before
writing.
+ On long-running deployments with thousands of past events, this query
runs
+ while the task_instance row lock is held in ti_update_state, causing
idle-in-transaction
+ pile-up that exhausts API server memory and triggers OOMKill.
+ """
+ asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset",
group="asset")
+ session.add(asm)
+ asam = AssetAliasModel(name="test_nolazy_alias", group="test")
+ session.add(asam)
+ session.flush()
+
+ # Pre-populate existing alias-event rows to simulate a long-running
deployment.
+ # If .append() is used, SQLAlchemy will lazy-load ALL of these before
inserting the new one.
+ existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in
range(5)]
+ session.add_all(existing_events)
+ session.flush()
+ for ev in existing_events:
+ session.execute(
+
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id,
event_id=ev.id)
+ )
+ session.flush()
+
+ # Expire the alias so a lazy-load would have to hit the DB (no
in-memory cache).
+ session.expire(asam)
+
+ asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
+ asset_manager = AssetManager()
+
+ lazy_load_selects: list[str] = []
+ real_execute = session.execute
+
+ def tracking_execute(stmt, *args, **kwargs):
+ try:
+ compiled = str(stmt.compile(compile_kwargs={"literal_binds":
True}))
+ except Exception:
+ compiled = str(stmt)
+ # Detect a lazy-load SELECT joining asset_alias_asset_event with
asset_event
+ if (
+ "asset_alias_asset_event" in compiled.lower()
+ and "asset_event" in compiled.lower()
+ and compiled.strip().upper().startswith("SELECT")
+ ):
+ lazy_load_selects.append(compiled[:120])
+ return real_execute(stmt, *args, **kwargs)
+
+ with mock.patch.object(session, "execute",
side_effect=tracking_execute):
Review Comment:
Addressed in c3e3e7c. I replaced the `Session.execute` mock with a
`before_cursor_execute` listener on the session bind so the test captures the
actual SQL emitted by SQLAlchemy, including relationship-loader SELECTs. I also
red-checked it locally by temporarily swapping the production path back to
`asset_alias_model.asset_events.append(asset_event)` and rerunning the targeted
test; it failed with the lazy-load SELECT captured, then passed again after
restoring the direct association-table insert.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -490,14 +499,40 @@ def ti_update_state(
task_id=task_id,
map_index=map_index,
)
+ session.commit()
except Exception:
+ session.rollback()
log.warning(
"Failed to clear task state on success",
dag_id=dag_id,
run_id=run_id,
task_id=task_id,
)
+ # Asset registration runs outside the TI row lock. Failures are logged and
counted;
+ # raising HTTP 500 here would be misleading because the task already
succeeded and
+ # would make the worker retry a state update that has already completed.
Durable
+ # retry/reconciliation for dropped asset events is out of scope for this
hot-path fix.
+ if isinstance(ti_patch_payload, TISuccessStatePayload) and
ti_patch_payload.task_outlets:
Review Comment:
Addressed in c3e3e7c. The post-commit asset registration guard now also
requires `updated_state == TaskInstanceState.SUCCESS`, so the registration path
only runs for the committed successful state. I added a regression that forces
`_create_ti_state_update_query_and_update_state` to raise, verifies the handler
still returns 204 after committing the TI as `FAILED`, and asserts asset
registration is skipped: no `register_asset_changes_in_db` call, no
`AssetEvent`, and no `asset.registration_failures` metric because registration
never ran.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]