safaehar opened a new pull request, #66868:
URL: https://github.com/apache/airflow/pull/66868
## Summary
When the scheduler creates a `DagRun`, `DagRun._create_task_instances`
bulk-inserts every `TaskInstance` for that run in a single call. On PostgreSQL
this currently goes through SQLAlchemy ORM's `bulk_insert_mappings`, which emits
```sql
INSERT INTO task_instance (...) VALUES (...), (...), ...
```
— one bind tuple per row, ~35 columns each. The wire payload scales with
`rows × columns`, which is costly for DagRuns with mapped-task expansion or
wide DAGs.
This PR adds a PostgreSQL-only fast path that emits instead
```sql
INSERT INTO task_instance (<cols>)
SELECT * FROM unnest(:c1::t1[], :c2::t2[], ...)
```
— one typed array per column, so the payload scales with `columns + rows`
and the planner sees a single static statement regardless of batch size.
The dispatch follows the existing dialect-branch precedent in
`airflow/dag_processing/collection.py::activate_assets_if_possible`. Other
backends (MySQL, SQLite) and the `task_instance_mutation_hook` path (which
needs per-object ORM access) are unchanged.
## Details
- New private helpers in `airflow/models/dagrun.py`:
- `_build_postgres_unnest_insert(keys)` — builds the `INSERT … SELECT *
FROM unnest(…)` statement from `TaskInstance.__mapper__`. The SQL column list,
ordering, and PG element types are all derived from the mapper (`UtcDateTime →
TIMESTAMP WITH TIME ZONE[]`, `ExtendedJSON → JSONB[]`, `ExecutorConfigType →
BYTEA[]`, etc.), so new columns flow through without code changes here. The
cast is injected by SQLAlchemy via
`bindparam(type_=postgresql.ARRAY(col.type))` rather than hand-rolled —
avoiding a real footgun (`text()` placeholder parsing breaks on `:id::UUID[]`
without an intervening space).
- `_bulk_insert_task_instance_dicts_postgres(task_dicts, session)` —
materializes the dict iterator, looks up the cached statement by
`frozenset(keys)`, and executes with column-major arrays.
- `_create_task_instances` now branches on `get_dialect_name(session)`
inside the `hook_is_noop` arm.
- `TaskInstance.insert_mapping` now fills `id` (via the same `uuid7` default
as the column) and `updated_at` (via `timezone.utcnow()`) explicitly, so the
unnest path does not need to replicate SQLAlchemy's column-default application.
The pre-fill is behaviour-equivalent for non-Postgres backends because
`bulk_insert_mappings` would have applied the same defaults.
## Tests
Added in `airflow-core/tests/unit/models/test_dagrun.py`:
- `TestPostgresUnnestBulkInsert`
- drops dict keys that are not columns (mirrors `bulk_insert_mappings`)
- emits the expected `BYTEA[] / JSONB[] / TIMESTAMP WITH TIME ZONE[] /
VARCHAR(1000)[]` casts when compiled for postgres
- uses the SQL column name (`task_display_name`) even when the Python attr
is `_task_display_property_value`
- is a no-op on empty input
- emits column-major arrays as bind params
- `test_create_task_instances_uses_unnest_path_on_postgres` — dispatch goes
through the helper, not `bulk_insert_mappings`.
- `test_create_task_instances_uses_bulk_insert_mappings_on_non_postgres` —
sqlite/mysql keep the existing path.
- `test_create_task_instances_mutation_hook_still_uses_bulk_save_objects` —
non-noop hook stays on the ORM path even on PG.
All 8 new tests + the 167 other tests in `test_dagrun.py` pass locally on
SQLite. `mypy-airflow-core` and `ruff` are clean.
## Benchmarks
Driving motivation is a measured speedup on a downstream fork at Datadog.
Numbers from infra-staging to follow before flipping out of draft.
## Test plan
- [ ] Validate in infra-staging with a real PostgreSQL — confirm scheduler
throughput improves on DagRuns with large mapped-task expansion and no
functional regression.
- [ ] Run `breeze testing core-tests --backend postgres --test-type Core -k
"TestPostgresUnnest or test_create_task_instances"` against postgres.
- [ ] Rename `airflow-core/newsfragments/pr_number.improvement.rst` →
`<this-PR-number>.improvement.rst` once the PR number is assigned.
- [ ] Confirm CI is green.
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes — Claude Code (Opus 4.7)
Generated-by: Claude Code (Opus 4.7) following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]