hkc-8010 commented on code in PR #66854:
URL: https://github.com/apache/airflow/pull/66854#discussion_r3267306164
##########
airflow-core/tests/unit/assets/test_manager.py:
##########
@@ -162,6 +163,90 @@ def test_register_asset_change_with_alias(
)
assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
+ def test_register_asset_change_with_alias_no_lazy_load(
+ self, session, mock_task_instance, testing_dag_bundle
+ ):
+ """Regression: alias-event association must use a direct INSERT, not
ORM .append().
+
+ ORM .append() lazy-loads the entire asset_events collection before
writing.
+ On long-running deployments with thousands of past events, this query
runs
+ while the task_instance row lock is held in ti_update_state, causing
idle-in-transaction
+ pile-up that exhausts API server memory and triggers OOMKill.
+ """
+ from sqlalchemy import insert as sa_insert
Review Comment:
Done. Moved `insert` to the top-level `from sqlalchemy import delete, func,
insert, select` and dropped the `sa_insert` alias. No name collision exists in
this file so the rename had no purpose.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -457,6 +457,12 @@ def ti_update_state(
extra=json.dumps({"host_name": hostname}) if hostname else
None,
)
)
+ # Commit the TI state update now to release the task_instance row lock
before
+ # running asset-event queries. Asset registration can hold the lock
for seconds
+ # under high concurrency (many aliases with large event histories),
causing
+ # idle-in-transaction pile-up that exhausts API server memory and
triggers OOMKill.
+ # The task outcome is durable from this point on.
+ session.commit()
Review Comment:
The early `session.commit()` is still needed even with the direct-INSERT fix
in manager.py. The manager.py change only eliminates the O(n) lazy-load SELECT
on the alias-event table. `register_asset_changes_in_db` also queries scheduled
dags and inserts `AssetDagRunQueue` rows, all of which would otherwise hold the
row lock and cause the same idle-in-transaction pile-up.
For the silent-drop concern: swallowing the exception here is intentional.
By the time `register_asset_changes_in_db` runs, the TI state update is already
committed and durable. Returning HTTP 500 at this point would cause the
task-SDK worker to retry a state update for a task that has already
successfully completed, which is worse than a silent failure. I've improved the
comment on both blocks to make this design intent explicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]