kaxil commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3359448918
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -490,14 +499,40 @@ def ti_update_state(
task_id=task_id,
map_index=map_index,
)
+ session.commit()
except Exception:
+ session.rollback()
log.warning(
"Failed to clear task state on success",
dag_id=dag_id,
run_id=run_id,
task_id=task_id,
)
+ # Asset registration runs outside the TI row lock. Failures are logged and
counted;
+ # raising HTTP 500 here would be misleading because the task already
succeeded and
+ # would make the worker retry a state update that has already completed.
Durable
+ # retry/reconciliation for dropped asset events is out of scope for this
hot-path fix.
+ if isinstance(ti_patch_payload, TISuccessStatePayload) and
ti_patch_payload.task_outlets:
Review Comment:
This gates on the payload type, but `updated_state` can have been flipped to
`FAILED` by the time we reach here. If
`_create_ti_state_update_query_and_update_state` raises, the `except` block
commits the TI as failed (`query.values(state=(updated_state :=
TaskInstanceState.FAILED))`, line 445). The payload is still a
`TISuccessStatePayload`, so this block runs anyway and emits asset events +
queues downstream `AssetDagRunQueue` rows for a task that actually ended up
failed.
On `main` the registration ran inside
`_create_ti_state_update_query_and_update_state` under that same `try`, so a
raised exception skipped it. Moving it out here and switching the guard from
the committed state to the payload type drops that coupling.
The `state_store` clear block right above gates on `if updated_state ==
TaskInstanceState.SUCCESS:` (line 485). Suggest the same guard here so a forced
failure can't still register assets.
##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -154,6 +155,88 @@ def test_register_asset_change_with_alias(
)
assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
+ def test_register_asset_change_with_alias_no_lazy_load(
+ self, session, mock_task_instance, testing_dag_bundle
+ ):
+ """Regression: alias-event association must use a direct INSERT, not
ORM .append().
+
+ ORM .append() lazy-loads the entire asset_events collection before
writing.
+ On long-running deployments with thousands of past events, this query
runs
+ while the task_instance row lock is held in ti_update_state, causing
idle-in-transaction
+ pile-up that exhausts API server memory and triggers OOMKill.
+ """
+ asm = AssetModel(uri="test://asset-nolazy/", name="test_nolazy_asset",
group="asset")
+ session.add(asm)
+ asam = AssetAliasModel(name="test_nolazy_alias", group="test")
+ session.add(asam)
+ session.flush()
+
+ # Pre-populate existing alias-event rows to simulate a long-running
deployment.
+ # If .append() is used, SQLAlchemy will lazy-load ALL of these before
inserting the new one.
+ existing_events = [AssetEvent(asset_id=asm.id, extra={}) for _ in
range(5)]
+ session.add_all(existing_events)
+ session.flush()
+ for ev in existing_events:
+ session.execute(
+
insert(asset_alias_asset_event_association_table).values(alias_id=asam.id,
event_id=ev.id)
+ )
+ session.flush()
+
+ # Expire the alias so a lazy-load would have to hit the DB (no
in-memory cache).
+ session.expire(asam)
+
+ asset = Asset(uri="test://asset-nolazy", name="test_nolazy_asset")
+ asset_manager = AssetManager()
+
+ lazy_load_selects: list[str] = []
+ real_execute = session.execute
+
+ def tracking_execute(stmt, *args, **kwargs):
+ try:
+ compiled = str(stmt.compile(compile_kwargs={"literal_binds":
True}))
+ except Exception:
+ compiled = str(stmt)
+ # Detect a lazy-load SELECT joining asset_alias_asset_event with
asset_event
+ if (
+ "asset_alias_asset_event" in compiled.lower()
+ and "asset_event" in compiled.lower()
+ and compiled.strip().upper().startswith("SELECT")
+ ):
+ lazy_load_selects.append(compiled[:120])
+ return real_execute(stmt, *args, **kwargs)
+
+ with mock.patch.object(session, "execute",
side_effect=tracking_execute):
Review Comment:
Did you confirm this test fails when the production code is reverted to the
old `.append()` path? My concern is that ORM collection lazy-loads are emitted
by the relationship loader strategy and may not route through the public
`Session.execute` you patch here, in which case `lazy_load_selects` stays empty
even with `.append()` and the assertion at the bottom passes either way, so it
wouldn't actually catch the regression it's named for.
The revert-and-watch-it-fail check would settle it. If lazy loads don't
surface through this wrapper, an `event.listen(engine, "before_cursor_execute",
...)` hook capturing the emitted SQL would be a more reliable probe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]