kaxil opened a new pull request, #60956:
URL: https://github.com/apache/airflow/pull/60956

   The scheduler's `get_running_dag_runs_to_examine()` was using 
`joinedload(DagRun.task_instances)` which loaded ALL TaskInstances for each 
DagRun into memory. This happens every scheduler loop and could cause 
significant memory pressure with large DAGs.
   
   **Changes:**
   - Remove unnecessary `joinedload(task_instances)` from the query
   - Convert the TI loop in `_verify_integrity_if_dag_changed` to a bulk UPDATE 
statement
   - Add `session.expire()` to ensure relationship cache coherence
   
   ## Benchmark Results
   
   **Configuration:** 100 tasks × 20 DAG runs = 2,000 TaskInstances (matches 
`DEFAULT_DAGRUNS_TO_EXAMINE`)
   
   | Metric | OLD (joinedload) | NEW (bulk UPDATE) | Savings |
   |--------|------------------|-------------------|---------|
   | Memory used | 6.58 MB | 0.24 MB | **96.4%** |
   | Peak memory | 9.38 MB | 6.63 MB | 29.3% |
   | TIs loaded | 2,000 | 0 | 100% |
   
   **Scaling with DAG size:**
   
   | DAG Size | TIs per Loop | Memory Saved |
   |----------|--------------|--------------|
   | 50 tasks | 1,000 | ~3 MB |
   | 100 tasks | 2,000 | ~6 MB |
   | 500 tasks | 10,000 | ~30 MB |
   
   This memory churn happens **every scheduler loop** (default: every second), 
so eliminating it reduces GC pressure significantly.
   
   ### Benchmark Methodology
   
   Used `tracemalloc` for accurate Python memory measurement via pytest with 
`dag_maker` fixtures:
   
   ```python
   # OLD approach - loads all TIs into memory
   dag_runs = session.scalars(
       select(DagRun)
       .where(DagRun.state == DagRunState.RUNNING)
       .options(joinedload(DagRun.task_instances))  # Loads ALL TIs
   ).unique().all()
   
   # NEW approach - bulk UPDATE, no TIs loaded
   dag_runs = session.scalars(
       select(DagRun)
       .where(DagRun.state == DagRunState.RUNNING)
   ).all()
   
   for dag_run in dag_runs:
       session.execute(
           update(TI)
           .where(TI.dag_id == dag_run.dag_id, TI.run_id == dag_run.run_id)
           .where(TI.state.in_(State.unfinished))
           .values(dag_version_id=latest_dag_version.id),
           execution_options={"synchronize_session": False},
       )
       session.expire(dag_run, ["task_instances"])
   ```
   
   ## Technical Details
   
   ### Why both UPDATE and verify_integrity() are needed
   
   They serve different purposes:
   
   1. **Bulk UPDATE**: Sets `dag_version_id` on **existing** unfinished TIs
      - TIs created from older DAG version but haven't finished yet
      - They need to reference the new DAG version
   
   2. **verify_integrity()**: Creates **new** TIs for tasks added to DAG
      - When DAG v2 adds a new task, we create a TI for it
      - New TIs get the new `dag_version_id` via `task_creator`
   
   **Example:**
   - DAG v1: tasks A, B, C → TIs created with dag_version_id=v1
   - Tasks A, B complete; C still SCHEDULED
   - DAG v2 adds task D
   - **UPDATE**: Sets C's dag_version_id to v2 (existing unfinished TI)
   - **verify_integrity()**: Creates TI for D with dag_version_id=v2 (new TI)
   
   ### Session synchronization
   
   Using `synchronize_session=False` because:
   - We handle cache coherence explicitly via `session.expire(dag_run, 
["task_instances"])`
   - The TIs aren't loaded in the session at this point (we removed the 
joinedload)
   - `verify_integrity()` does a fresh query via `get_task_instances()`, so it 
sees updated values
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes (please specify the tool below) Opus
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to