This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a35ef873d8a Improve 0101_3_2_0_ui_improvements_for_deadlines upgrade
migration (#63920)
a35ef873d8a is described below
commit a35ef873d8a578628139a6d4234fb65396258152
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Mar 23 16:42:42 2026 +0800
Improve 0101_3_2_0_ui_improvements_for_deadlines upgrade migration (#63920)
* Use PK index for keyset pagination in 0101 migration
Switch batch pagination from unindexed dag_id to the primary key id
(UUID7, time-ordered) in both upgrade and downgrade loops. This
eliminates full table scans + sorts on every batch iteration.
* Add temporary index on deadline.dagrun_id during 0101 migration
Create a temporary index on deadline(dagrun_id) before the migration
loop and drop it after. Without this, the per-alert UPDATE that joins
dag_run to find matching deadline rows requires a full table scan of
the deadline table on every iteration.
* Add SQL-side deadline filtering to skip DAGs without deadlines in 0101
Use dialect-specific JSON path checks (PostgreSQL jsonb operators,
MySQL JSON_EXTRACT, SQLite json_extract) to filter out serialized_dag
rows that have no deadline data. This avoids transferring and
processing large data blobs for the majority of DAGs that have no
deadline configuration. Compressed rows are still included since the
DB cannot inspect their content.
* Eliminate triple decompression in 0101 migration upgrade path
For DAGs with deadline data, the migration previously decompressed the
serialized_dag blob 3 times: once to extract deadlines, once in
update_dag_deadline_field() to modify the data, and once to recompute
the dag_hash. Now the parsed dag_data is kept in memory, modified
once, the hash is computed from it, and the result is written back in
a single UPDATE combining data + dag_hash.
* Cache dagrun IDs per DAG to avoid repeated JOINs in 0101 migration
The correlated subquery joining dag_run and serialized_dag to find
matching deadline rows was executed for every individual alert within
a DAG. Now the dagrun IDs are fetched once per DAG and reused for
each alert's UPDATE via an expanding bind parameter.
* Use temporary_index context manager instead of inline try/finally
Extract the create-index/drop-index pattern into a reusable
context manager to reduce indentation churn and make the intent
clearer at the call site.
* Convert temporary_index to decorator to avoid indentation churn
Extract the migration loop body into _migrate_deadline_alerts()
decorated with @temporary_index, so the while loop stays at its
original indentation level. The outer function keeps only the
offline-mode guard.
* Add comments explaining why keyset pagination uses id instead of dag_id
* Move dagrun_ids select to where it being referenced
* Add additional validation for existing temp index check
* Remove unit test for migration
* Speed up deadline alert UI migration updates
Move dagrun_ids outside of inner loop
---
.../0101_3_2_0_ui_improvements_for_deadlines.py | 311 +++++++++++++++++----
1 file changed, 252 insertions(+), 59 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
index 0233f48c9a9..3e1a5c5d54f 100644
---
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
+++
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
@@ -31,6 +31,7 @@ Create Date: 2025-10-17 16:04:55.016272
from __future__ import annotations
import contextlib
+import functools
import json
import zlib
from collections import defaultdict
@@ -74,6 +75,83 @@ DEADLINE_ALERT_REQUIRED_FIELDS = {REFERENCE_KEY,
CALLBACK_KEY, INTERVAL_KEY}
DEFAULT_BATCH_SIZE = 1000
ENCODING_TYPE = "deadline_alert"
+
+def _has_matching_index(conn: Connection, table_name: str, columns:
Iterable[str]) -> bool:
+ log.debug("Targeting index check", table=table_name, columns=list(columns))
+ target_columns = list(columns)
+ inspector = sa.inspect(conn)
+
+ for index in inspector.get_indexes(table_name):
+ index_columns = index.get("column_names") or []
+ log.debug("Checking index", table=table_name, index=index.get("name"),
columns=index_columns)
+ if index_columns[: len(target_columns)] == target_columns:
+ return True
+
+ primary_key = inspector.get_pk_constraint(table_name) or {}
+ primary_key_columns = primary_key.get("constrained_columns") or []
+ log.info("Checking primary key", table=table_name,
columns=primary_key_columns)
+ if primary_key_columns[: len(target_columns)] == target_columns:
+ return True
+
+ with contextlib.suppress(Exception):
+ for constraint in inspector.get_unique_constraints(table_name):
+ constraint_columns = constraint.get("column_names") or []
+ log.debug(
+ "Checking unique constraint",
+ table=table_name,
+ constraint=constraint.get("name"),
+ columns=constraint_columns,
+ )
+ if constraint_columns[: len(target_columns)] == target_columns:
+ return True
+
+ return False
+
+
+def _is_mysql_foreign_key_index_error(conn: Connection, exc:
sa.exc.OperationalError) -> bool:
+ if conn.dialect.name != "mysql":
+ return False
+
+ error_args = getattr(exc.orig, "args", ())
+ return bool(error_args) and error_args[0] == 1553
+
+
+def temporary_index(index_name, table_name, columns):
+ """Create an index before the wrapped function runs and drop it after,
even on error."""
+
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ conn = op.get_bind()
+ if _has_matching_index(conn, table_name, columns):
+ log.info(
+ "Matching index already exists, skipping creation of
temporary index",
+ index_name=index_name,
+ table_name=table_name,
+ columns=columns,
+ )
+ return func(*args, **kwargs)
+
+ op.create_index(index_name, table_name, columns)
+ try:
+ return func(*args, **kwargs)
+ finally:
+ try:
+ op.drop_index(index_name, table_name=table_name)
+ except sa.exc.OperationalError as exc:
+ if not _is_mysql_foreign_key_index_error(conn, exc):
+ raise
+ log.warning(
+ "Keeping temporary index because MySQL bound it to a
foreign key",
+ index_name=index_name,
+ table_name=table_name,
+ )
+
+ return wrapper
+
+ return decorator
+
+
deadline_alert_table = sa.table(
"deadline_alert",
sa.column("reference"),
@@ -396,6 +474,13 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
)
return
+ _migrate_deadline_alerts()
+
+
+# Temporary index on deadline.dagrun_id speeds up the per-alert UPDATE that
finds
+# matching deadline rows. Without it, every UPDATE full-scans the deadline
table.
+@temporary_index("tmp_deadline_dagrun_id_idx", "deadline", ["dagrun_id"])
+def _migrate_deadline_alerts() -> None:
BATCH_SIZE = conf.getint("database", "migration_batch_size",
fallback=DEFAULT_BATCH_SIZE)
processed_dags: list[str] = []
@@ -403,11 +488,48 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
migrated_alerts_count: int = 0
dags_with_errors: ErrorDict = defaultdict(list)
batch_num = 0
- last_dag_id = ""
+ # Paginate by primary key (id) instead of dag_id because id is indexed (PK)
+ # while dag_id has no index — using dag_id would cause a full table scan +
sort
+ # on every batch. UUID7 ids are time-ordered so the pagination order is
stable.
+ last_id = "00000000-0000-0000-0000-000000000000"
conn = op.get_bind()
dialect = conn.dialect.name
+ # Build dialect-specific filter to skip rows without deadline data at the
SQL level.
+ # This avoids transferring and processing large data blobs for the
majority of DAGs
+ # that have no deadline configuration. Compressed rows are always included
since the
+ # DB cannot inspect their content.
+ if dialect == "postgresql":
+ deadline_filter = (
+ "AND ("
+ " data_compressed IS NOT NULL"
+ " OR (data IS NOT NULL"
+ " AND (data::jsonb -> 'dag' -> 'deadline') IS NOT NULL"
+ " AND (data::jsonb -> 'dag' -> 'deadline') != 'null'::jsonb"
+ " AND (data::jsonb -> 'dag' -> 'deadline') != '[]'::jsonb)"
+ ")"
+ )
+ elif dialect == "mysql":
+ deadline_filter = (
+ "AND ("
+ " data_compressed IS NOT NULL"
+ " OR (data IS NOT NULL"
+ " AND JSON_EXTRACT(data, '$.dag.deadline') IS NOT NULL"
+ " AND IFNULL(JSON_LENGTH(JSON_EXTRACT(data,
'$.dag.deadline')), 0) > 0)"
+ ")"
+ )
+ else:
+ deadline_filter = (
+ "AND ("
+ " data_compressed IS NOT NULL"
+ " OR (data IS NOT NULL"
+ " AND json_extract(data, '$.dag.deadline') IS NOT NULL"
+ " AND json_extract(data, '$.dag.deadline') != 'null'"
+ " AND json_extract(data, '$.dag.deadline') != '[]')"
+ ")"
+ )
+
total_dags = conn.execute(
sa.text("SELECT COUNT(*) FROM serialized_dag WHERE data IS NOT NULL OR
data_compressed IS NOT NULL")
).scalar()
@@ -421,31 +543,33 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
if dialect == "mysql":
# Avoid selecting large columns during ORDER BY to prevent sort
buffer overflow
result = conn.execute(
- sa.text("""
+ sa.text(f"""
SELECT sd.id, sd.dag_id, sd.data, sd.data_compressed,
sd.created_at
FROM serialized_dag sd
INNER JOIN (
- SELECT id, dag_id
+ SELECT id
FROM serialized_dag
- WHERE dag_id > :last_dag_id
- ORDER BY dag_id
+ WHERE id > :last_id
+ {deadline_filter}
+ ORDER BY id
LIMIT :batch_size
) AS subq ON sd.id = subq.id
"""),
- {"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
+ {"last_id": last_id, "batch_size": BATCH_SIZE},
)
- batch_results = sorted(result, key=lambda r: r.dag_id)
+
+ batch_results = sorted(list(result), key=lambda r: r.id)
else:
result = conn.execute(
- sa.text("""
+ sa.text(f"""
SELECT id, dag_id, data, data_compressed, created_at
FROM serialized_dag
- WHERE (data IS NOT NULL OR data_compressed IS NOT NULL)
- AND dag_id > :last_dag_id
- ORDER BY dag_id
+ WHERE id > :last_id
+ {deadline_filter}
+ ORDER BY id
LIMIT :batch_size
"""),
- {"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
+ {"last_id": last_id, "batch_size": BATCH_SIZE},
)
batch_results = list(result)
if not batch_results:
@@ -455,11 +579,12 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
for serialized_dag_id, dag_id, data, data_compressed, created_at in
batch_results:
processed_dags.append(dag_id)
- last_dag_id = dag_id
+ last_id = str(serialized_dag_id)
# Validation that does not need a DB connection.
try:
- dag_deadline = get_dag_data(data,
data_compressed)[DAG_KEY].get(DEADLINE_KEY)
+ dag_data = get_dag_data(data, data_compressed)
+ dag_deadline = dag_data[DAG_KEY].get(DEADLINE_KEY)
except (json.JSONDecodeError, KeyError, TypeError) as e:
dags_with_errors[dag_id].append(f"Could not process serialized
Dag: {e}")
continue
@@ -470,6 +595,18 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
deadline_alerts = dag_deadline if isinstance(dag_deadline, list)
else [dag_deadline]
def _migrate_dag_deadlines(dag_conn: Connection) -> Iterable[str]:
+ dagrun_ids = [
+ row[0]
+ for row in dag_conn.execute(
+ sa.text("""
+ SELECT dr.id
+ FROM dag_run dr
+ JOIN serialized_dag sd ON dr.dag_id = sd.dag_id
+ WHERE sd.id = :serialized_dag_id
+ """),
+ {"serialized_dag_id": serialized_dag_id},
+ ).fetchall()
+ ]
for serialized_alert in deadline_alerts:
if not isinstance(serialized_alert, dict):
continue
@@ -519,25 +656,29 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
)
if not validate_written_data(
- dag_conn, deadline_alert_id, reference_data,
interval_data, callback_data
+ dag_conn,
+ deadline_alert_id,
+ reference_data,
+ interval_data,
+ callback_data,
):
dags_with_errors[dag_id].append(f"Invalid
DeadlineAlert data: {serialized_alert}")
continue
yield deadline_alert_id
- dag_conn.execute(
- sa.text("""
- UPDATE deadline
- SET deadline_alert_id = :alert_id
- WHERE dagrun_id IN (
- SELECT dr.id
- FROM dag_run dr
- JOIN serialized_dag sd ON
dr.dag_id = sd.dag_id
- WHERE sd.id = :serialized_dag_id)
- AND deadline_alert_id IS NULL
- """),
- {"alert_id": deadline_alert_id,
"serialized_dag_id": serialized_dag_id},
- )
+ if dagrun_ids:
+ dag_conn.execute(
+ sa.text("""
+ UPDATE deadline
+ SET deadline_alert_id = :alert_id
+ WHERE dagrun_id IN :dagrun_ids
+ AND deadline_alert_id IS NULL
+ """).bindparams(sa.bindparam("dagrun_ids",
expanding=True)),
+ {
+ "alert_id": deadline_alert_id,
+ "dagrun_ids": dagrun_ids,
+ },
+ )
except Exception as e:
dags_with_errors[dag_id].append(f"Failed to process
{serialized_alert}: {e}")
continue
@@ -545,31 +686,80 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
try:
with _begin_nested_transaction(conn) as dag_conn:
migrated_alert_ids = list(_migrate_dag_deadlines(dag_conn))
- if migrated_alert_ids:
- uuid_strings = [str(uuid_id) for uuid_id in
migrated_alert_ids]
- update_dag_deadline_field(dag_conn, serialized_dag_id,
uuid_strings, dialect)
- updated_result = dag_conn.execute(
+ if not migrated_alert_ids:
+ continue
+
+ uuid_strings = [str(uuid_id) for uuid_id in
migrated_alert_ids]
+
+ # Update the deadline field in the already-parsed dag_data
and
+ # write back once, avoiding redundant
decompression/recompression.
+ dag_data[DAG_KEY][DEADLINE_KEY] = uuid_strings
+ new_hash = hash_dag(dag_data)
+
+ if data_compressed:
+ new_compressed =
zlib.compress(json.dumps(dag_data).encode("utf-8"))
+ dag_conn.execute(
sa.text(
- "SELECT data, data_compressed "
- "FROM serialized_dag "
+ "UPDATE serialized_dag "
+ "SET data_compressed = :data, dag_hash =
:new_hash "
"WHERE id = :serialized_dag_id"
),
- {"serialized_dag_id": serialized_dag_id},
- ).fetchone()
- if updated_result:
- updated_dag_data = get_dag_data(
- updated_result.data,
updated_result.data_compressed
- )
- new_hash = hash_dag(updated_dag_data)
- dag_conn.execute(
- sa.text(
- "UPDATE serialized_dag "
- "SET dag_hash = :new_hash "
- "WHERE id = :serialized_dag_id"
+ {
+ "data": new_compressed,
+ "new_hash": new_hash,
+ "serialized_dag_id": serialized_dag_id,
+ },
+ )
+ elif dialect == "postgresql":
+ dag_conn.execute(
+ sa.text("""
+ UPDATE serialized_dag
+ SET data = jsonb_set(
+ data::jsonb,
+ '{dag,deadline}',
+ CAST(:deadline_data AS jsonb)
+ )::json,
+ dag_hash = :new_hash
+ WHERE id = :serialized_dag_id
+ """),
+ {
+ "serialized_dag_id": serialized_dag_id,
+ "deadline_data": json.dumps(uuid_strings),
+ "new_hash": new_hash,
+ },
+ )
+ elif dialect == "mysql":
+ dag_conn.execute(
+ sa.text("""
+ UPDATE serialized_dag
+ SET data = JSON_SET(
+ data,
+ '$.dag.deadline',
+ CAST(:deadline_data AS JSON)
),
- {"new_hash": new_hash, "serialized_dag_id":
serialized_dag_id},
- )
- migrated_alerts_count += len(migrated_alert_ids)
+ dag_hash = :new_hash
+ WHERE id = :serialized_dag_id
+ """),
+ {
+ "serialized_dag_id": serialized_dag_id,
+ "deadline_data": json.dumps(uuid_strings),
+ "new_hash": new_hash,
+ },
+ )
+ else:
+ dag_conn.execute(
+ sa.text(
+ "UPDATE serialized_dag "
+ "SET data = :data, dag_hash = :new_hash "
+ "WHERE id = :serialized_dag_id"
+ ),
+ {
+ "data": json.dumps(dag_data),
+ "new_hash": new_hash,
+ "serialized_dag_id": serialized_dag_id,
+ },
+ )
+ migrated_alerts_count += len(migrated_alert_ids)
except (json.JSONDecodeError, KeyError, TypeError) as e:
log.exception("Could not migrate deadline for dag %s", dag_id)
dags_with_errors[dag_id].append(f"Could not migrate deadline:
{e}")
@@ -608,7 +798,10 @@ def migrate_deadline_alert_data_back_to_serialized_dag()
-> None:
restored_alerts_count: int = 0
dags_with_errors: ErrorDict = defaultdict(list)
batch_num = 0
- last_dag_id = ""
+ # Paginate by primary key (id) instead of dag_id because id is indexed (PK)
+ # while dag_id has no index — using dag_id would cause a full table scan +
sort
+ # on every batch. UUID7 ids are time-ordered so the pagination order is
stable.
+ last_id = "00000000-0000-0000-0000-000000000000"
conn = op.get_bind()
dialect = conn.dialect.name
@@ -632,16 +825,16 @@ def migrate_deadline_alert_data_back_to_serialized_dag()
-> None:
SELECT sd.id, sd.dag_id, sd.data, sd.data_compressed
FROM serialized_dag sd
INNER JOIN (
- SELECT id, dag_id
+ SELECT id
FROM serialized_dag
WHERE (data IS NOT NULL OR data_compressed IS NOT NULL)
- AND dag_id > :last_dag_id
- ORDER BY dag_id
+ AND id > :last_id
+ ORDER BY id
LIMIT :batch_size
) AS subq ON sd.id = subq.id
- ORDER BY sd.dag_id
+ ORDER BY sd.id
"""),
- {"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
+ {"last_id": last_id, "batch_size": BATCH_SIZE},
)
else:
result = conn.execute(
@@ -649,11 +842,11 @@ def migrate_deadline_alert_data_back_to_serialized_dag()
-> None:
SELECT id, dag_id, data, data_compressed
FROM serialized_dag
WHERE (data IS NOT NULL OR data_compressed IS NOT NULL)
- AND dag_id > :last_dag_id
- ORDER BY dag_id
+ AND id > :last_id
+ ORDER BY id
LIMIT :batch_size
"""),
- {"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
+ {"last_id": last_id, "batch_size": BATCH_SIZE},
)
batch_results = list(result)
@@ -663,7 +856,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() ->
None:
for serialized_dag_id, dag_id, data, data_compressed in batch_results:
processed_dags.append(dag_id)
- last_dag_id = dag_id
+ last_id = str(serialized_dag_id)
# Validation that does not need a DB connection.
try: