hkc-8010 opened a new issue, #66853:
URL: https://github.com/apache/airflow/issues/66853

   ## Problem
   
   Under high concurrency (80+ simultaneous task completions emitting asset 
events), the Airflow 3 API server dies with OOMKill. The root cause is a DB 
lock contention chain:
   
   1. `ti_update_state()` acquires `SELECT task_instance ... WITH FOR UPDATE`, 
holding a PostgreSQL row lock.
   2. While holding that lock, `register_asset_changes_in_db()` calls 
`AssetManager.register_asset_change()`, which runs multiple slow queries 
including `asset_alias_model.asset_events.append(asset_event)`. This ORM 
.append() lazy-loads the **entire** `asset_events` collection for the alias -- 
potentially thousands of rows on long-running deployments.
   3. Each slow query leaves the connection `idle in transaction` while Python 
processes results. New workers needing `SELECT task_instance FOR UPDATE` on the 
same row queue up, each holding a FastAPI threadpool thread via Cadwyn's 
`run_in_threadpool`.
   4. With 80+ concurrent completions, thread count grows unbounded until 
OOMKill.
   
   ### DB evidence (confirmed from production, May 2026)
   
   ```
   pid 31104: idle in transaction, xact_age=3:35
     last query: SELECT asset_alias WHERE name IN ('fivetran-synced-table')
   pid 31303: idle in transaction, xact_age=3:31, same pattern
   -- both blocking: SELECT task_instance FOR UPDATE WHERE id = UUID(...)
   ```
   
   Disabling the trigger DAGs dropped apiserver memory from 5Gi+ to MBs 
instantly. Re-enabling just one DAG reproduced 76 `QueryCanceled: canceling 
statement due to statement timeout` errors in 1.5 hours.
   
   Blocking query chain confirmed from `pg_stat_activity`:
   
   ```sql
   pid 31303: idle in transaction
     INSERT INTO asset_alias_asset_event VALUES (2, 1151718)
   pid 31104: idle in transaction
     SELECT asset_alias ... WHERE name IN ('fivetran-synced-table')
   -- both blocking 8 workers doing:
     SELECT task_instance ... FOR UPDATE WHERE id = UUID(...)
   ```
   
   ## Fix
   
   Two changes:
   
   **1. `AssetManager.register_asset_change()` (`assets/manager.py`)**: Replace 
`asset_alias_model.asset_events.append(asset_event)` + 
`session.add(asset_alias_model)` with a direct `INSERT INTO 
asset_alias_asset_event (alias_id, event_id)`. This eliminates the lazy-load of 
the existing events collection (which can be thousands of rows) while the 
task_instance row lock is held.
   
   **2. `ti_update_state()` (`execution_api/routes/task_instances.py`)**: Add 
`session.commit()` after the TI state UPDATE and Log writes to release the 
`task_instance` row lock before running asset registration. Asset registration 
then runs in a fresh implicit transaction. Registration failures are logged and 
swallowed -- the task state is already durable at that point.
   
   Note on `session.commit()` in a session-parameter function: this 
intentionally deviates from the project convention. No code after the commit 
relies on rollback; the subsequent `session.get()` re-loads fresh state. 
Alternative approaches (second session, background task) were considered but 
have higher operational complexity for equivalent correctness.
   
   ## Testing
   
   - New: `test_register_asset_change_with_alias_no_lazy_load` -- confirms no 
SELECT on `asset_alias_asset_event` collection during registration when 
pre-existing rows exist
   - New: 
`test_ti_update_state_to_success_asset_registration_failure_returns_204` -- 
confirms 204 + TI SUCCESS when asset registration raises after commit
   - All 22 existing `test_manager.py` tests pass
   - All 38 existing `TestTIUpdateState` tests pass
   - Breeze `--python 3.10 --db-reset` clean on both test files
   
   ## Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to