safaehar opened a new pull request, #66868:
URL: https://github.com/apache/airflow/pull/66868

   ## Summary
   
   When the scheduler creates a `DagRun`, `DagRun._create_task_instances` 
bulk-inserts every `TaskInstance` for that run in a single call. On PostgreSQL 
this currently goes through SQLAlchemy ORM's `bulk_insert_mappings`, which emits
   
   ```sql
   INSERT INTO task_instance (...) VALUES (...), (...), ...
   ```
   
   — one bind tuple per row, ~35 columns each. The wire payload scales with 
`rows × columns`, which is costly for DagRuns with mapped-task expansion or 
wide DAGs.
   
   This PR adds a PostgreSQL-only fast path that emits instead
   
   ```sql
   INSERT INTO task_instance (<cols>)
   SELECT * FROM unnest(:c1::t1[], :c2::t2[], ...)
   ```
   
   — one typed array per column, so the payload scales with `columns + rows` 
and the planner sees a single static statement regardless of batch size.
   
   The dispatch follows the existing dialect-branch precedent in 
`airflow/dag_processing/collection.py::activate_assets_if_possible`. Other 
backends (MySQL, SQLite) and the `task_instance_mutation_hook` path (which 
needs per-object ORM access) are unchanged.
   
   ## Details
   
   - New private helpers in `airflow/models/dagrun.py`:
     - `_build_postgres_unnest_insert(keys)` — builds the `INSERT … SELECT * 
FROM unnest(…)` statement from `TaskInstance.__mapper__`. The SQL column list, 
ordering, and PG element types are all derived from the mapper (`UtcDateTime → 
TIMESTAMP WITH TIME ZONE[]`, `ExtendedJSON → JSONB[]`, `ExecutorConfigType → 
BYTEA[]`, etc.), so new columns flow through without code changes here. The 
cast is injected by SQLAlchemy via 
`bindparam(type_=postgresql.ARRAY(col.type))` rather than hand-rolled — 
avoiding a real footgun (`text()` placeholder parsing breaks on `:id::UUID[]` 
without an intervening space).
     - `_bulk_insert_task_instance_dicts_postgres(task_dicts, session)` — 
materializes the dict iterator, looks up the cached statement by 
`frozenset(keys)`, and executes with column-major arrays.
   - `_create_task_instances` now branches on `get_dialect_name(session)` 
inside the `hook_is_noop` arm.
   - `TaskInstance.insert_mapping` now fills `id` (via the same `uuid7` default 
as the column) and `updated_at` (via `timezone.utcnow()`) explicitly, so the 
unnest path does not need to replicate SQLAlchemy's column-default application. 
The pre-fill is behaviour-equivalent for non-Postgres backends because 
`bulk_insert_mappings` would have applied the same defaults.
   
   ## Tests
   
   Added in `airflow-core/tests/unit/models/test_dagrun.py`:
   
   - `TestPostgresUnnestBulkInsert`
     - drops dict keys that are not columns (mirrors `bulk_insert_mappings`)
     - emits the expected `BYTEA[] / JSONB[] / TIMESTAMP WITH TIME ZONE[] / 
VARCHAR(1000)[]` casts when compiled for postgres
     - uses the SQL column name (`task_display_name`) even when the Python attr 
is `_task_display_property_value`
     - is a no-op on empty input
     - emits column-major arrays as bind params
   - `test_create_task_instances_uses_unnest_path_on_postgres` — dispatch goes 
through the helper, not `bulk_insert_mappings`.
   - `test_create_task_instances_uses_bulk_insert_mappings_on_non_postgres` — 
sqlite/mysql keep the existing path.
   - `test_create_task_instances_mutation_hook_still_uses_bulk_save_objects` — 
non-noop hook stays on the ORM path even on PG.
   
   All 8 new tests + the 167 other tests in `test_dagrun.py` pass locally on 
SQLite. `mypy-airflow-core` and `ruff` are clean.
   
   ## Benchmarks
   
   Driving motivation is a measured speedup on a downstream fork at Datadog. 
Numbers from infra-staging to follow before flipping out of draft.
   
   ## Test plan
   
   - [ ] Validate in infra-staging with a real PostgreSQL — confirm scheduler 
throughput improves on DagRuns with large mapped-task expansion and no 
functional regression.
   - [ ] Run `breeze testing core-tests --backend postgres --test-type Core -k 
"TestPostgresUnnest or test_create_task_instances"` against postgres.
   - [ ] Rename `airflow-core/newsfragments/pr_number.improvement.rst` → 
`<this-PR-number>.improvement.rst` once the PR number is assigned.
   - [ ] Confirm CI is green.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes — Claude Code (Opus 4.7)
   
   Generated-by: Claude Code (Opus 4.7) following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to