kaxil commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3321537294
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -457,6 +457,12 @@ def ti_update_state(
extra=json.dumps({"host_name": hostname}) if hostname else
None,
)
)
+ # Commit the TI state update now to release the task_instance row lock
before
+ # running asset-event queries. Asset registration can hold the lock
for seconds
+ # under high concurrency (many aliases with large event histories),
causing
+ # idle-in-transaction pile-up that exhausts API server memory and
triggers OOMKill.
+ # The task outcome is durable from this point on.
+ session.commit()
Review Comment:
Following up here rather than opening a new thread. Swallowing makes sense
for task-state durability, a 500 would make the worker retry a task that
already succeeded. But I don't think that covers the event itself: once 204
goes back the supervisor treats the task as done and never retries, and nothing
reconciles a dropped event, so a failed `register_asset_changes_in_db` means an
asset-scheduled downstream DAG silently never fires. Could that failure be
surfaced (a metric, or a queued retry) instead of only `log.exception`? And
since the early `session.commit()` is what opens this window, has the
contention it fixes been measured, i.e. that the `AssetDagRunQueue` inserts
really need the lock released first?
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1368,6 +1368,166 @@ def test_ti_update_state_running_errors(self, client,
session, create_task_insta
assert response.status_code == 422
+ def test_ti_update_state_to_success_asset_registration_failure_returns_204(
+ self, client, session, create_task_instance
+ ):
+ """Regression: asset registration failure after TI state commit must
return 204, not 500.
+
+ The TI state is committed (and the row lock released) before asset
registration runs.
+ If registration fails at that point, the task outcome is already
durable as SUCCESS,
+ so surfacing HTTP 500 would be misleading and cause unnecessary worker
retries.
+ """
+ asset = AssetModel(
+ id=42,
+ name="fail-asset",
+ uri="s3://bucket/fail-asset",
+ group="asset",
+ extra={},
+ )
+ asset_active = AssetActive.for_asset(asset)
+ session.add_all([asset, asset_active])
+
+ ti = create_task_instance(
+ task_id="test_asset_reg_failure",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ with mock.patch(
+
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+ side_effect=Exception("simulated DB explosion during asset
registration"),
+ ):
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "success",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ "task_outlets": [
+ {"name": "fail-asset", "uri":
"s3://bucket/fail-asset", "type": "Asset"}
+ ],
+ "outlet_events": [],
+ },
+ )
+
+ assert response.status_code == 204, f"Expected 204, got
{response.status_code}: {response.text}"
+ session.expire_all()
+ ti_db = session.get(TaskInstance, ti.id)
+ assert ti_db is not None
+ assert ti_db.state == TaskInstanceState.SUCCESS
+
+ def test_ti_update_state_rolls_back_partial_asset_registration_on_failure(
+ self, client, session, create_task_instance
+ ):
+ asset = AssetModel(
+ id=43,
+ name="partial-asset",
+ uri="s3://bucket/partial-asset",
+ group="asset",
+ extra={},
+ )
+ session.add_all([asset, AssetActive.for_asset(asset)])
+
+ ti = create_task_instance(
+ task_id="test_partial_asset_registration_failure",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ def add_event_then_fail(ti, task_outlets, outlet_events, session):
+ session.add(
+ AssetEvent(
+ asset_id=asset.id,
+ extra={"partial": True},
+ source_task_id=ti.task_id,
+ source_dag_id=ti.dag_id,
+ source_run_id=ti.run_id,
+ source_map_index=ti.map_index,
+ )
+ )
+ session.flush()
+ raise RuntimeError("simulated failure after partial asset
registration")
+
+ with mock.patch(
+
"airflow.models.taskinstance.TaskInstance.register_asset_changes_in_db",
+ side_effect=add_event_then_fail,
+ ):
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "success",
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ "task_outlets": [
+ {"name": "partial-asset", "uri":
"s3://bucket/partial-asset", "type": "Asset"}
+ ],
+ "outlet_events": [],
+ },
+ )
+
+ assert response.status_code == 204, f"Expected 204, got
{response.status_code}: {response.text}"
+ session.expire_all()
+ ti_db = session.get(TaskInstance, ti.id)
+ assert ti_db is not None
+ assert ti_db.state == TaskInstanceState.SUCCESS
+ assert session.scalars(select(AssetEvent).where(AssetEvent.asset_id ==
asset.id)).all() == []
+
+ def test_ti_update_state_swallow_asset_registration_commit_failure(
+ self, client, session, create_task_instance
+ ):
+ asset = AssetModel(
+ id=44,
+ name="commit-fail-asset",
+ uri="s3://bucket/commit-fail-asset",
+ group="asset",
+ extra={},
+ )
+ session.add_all([asset, AssetActive.for_asset(asset)])
+
+ ti = create_task_instance(
+ task_id="test_asset_registration_commit_failure",
+ start_date=DEFAULT_START_DATE,
+ state=State.RUNNING,
+ )
+ session.commit()
+
+ real_commit = Session.commit
+ commit_count = 0
+ failed_asset_commit = False
+
+ def fail_second_commit(session):
+ nonlocal commit_count, failed_asset_commit
+ commit_count += 1
+ if commit_count == 2:
Review Comment:
`commit_count == 2` assumes asset registration is always the second commit,
but the success path has a conditional commit between them: with
`clear_on_success` enabled the clear block (`task_instances.py:501`) commits
too, making asset registration the third commit. With that config on this test
would inject the failure into the clear-on-success commit and still assert 204,
passing while exercising the wrong path. Matching on something other than
commit ordinal (e.g. patching `register_asset_changes_in_db` to commit then
raise, like the test just above) would hold up better.
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -327,8 +328,16 @@ def register_asset_change(
).unique()
for asset_alias_model in asset_alias_models:
- asset_alias_model.asset_events.append(asset_event)
- session.add(asset_alias_model)
+ # Use a direct INSERT rather than ORM .append() to avoid
lazy-loading the
+ # entire asset_events collection. On long-running deployments
that collection
+ # can contain thousands of rows; loading it on the
task-success hot path can
+ # leave DB connections idle-in-transaction for minutes,
blocking other workers.
+ session.execute(
Review Comment:
The direct `insert(...)` skips the ORM, so the in-memory
`asset_alias_model.asset_events` collection stays stale, the association row is
in the DB but not in the session's identity map. Nothing reads that collection
again before commit today so it's safe, but it's a quiet trap if future code in
the same request reads `asset_events` after this point. Worth a one-line note
that the relationship is intentionally left unsynced here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]