Copilot commented on code in PR #63978:
URL: https://github.com/apache/airflow/pull/63978#discussion_r3025333657
##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -421,31 +549,33 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
if dialect == "mysql":
# Avoid selecting large columns during ORDER BY to prevent sort
buffer overflow
result = conn.execute(
- sa.text("""
+ sa.text(f"""
SELECT sd.id, sd.dag_id, sd.data, sd.data_compressed,
sd.created_at
FROM serialized_dag sd
INNER JOIN (
- SELECT id, dag_id
+ SELECT id
FROM serialized_dag
- WHERE dag_id > :last_dag_id
- ORDER BY dag_id
+ WHERE id > :last_id
+ {deadline_filter}
+ ORDER BY id
LIMIT :batch_size
) AS subq ON sd.id = subq.id
"""),
- {"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
+ {"last_id": last_id, "batch_size": BATCH_SIZE},
)
- batch_results = sorted(result, key=lambda r: r.dag_id)
+
+ batch_results = sorted(list(result), key=lambda r: r.id)
Review Comment:
The Python-side `sorted(list(result), ...)` adds extra memory and CPU
overhead per batch. Since ordering is needed only to ensure stable pagination,
prefer adding `ORDER BY sd.id` to the outer query and then use `batch_results =
list(result)` (removing the in-Python sort).
##########
airflow-core/tests/unit/migrations/test_0101_ui_improvements_for_deadlines.py:
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib.util
+from pathlib import Path
+
+from sqlalchemy.exc import OperationalError
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+MIGRATION_PATH = Path(
+ AIRFLOW_CORE_SOURCES_PATH,
+ "airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py",
+)
+
+
+def load_migration_module():
+ spec = importlib.util.spec_from_file_location(
+ "migration_0101_ui_improvements_for_deadlines", MIGRATION_PATH
+ )
+ module = importlib.util.module_from_spec(spec)
+ assert spec.loader is not None
Review Comment:
`importlib.util.spec_from_file_location(...)` can return `None`, which would
raise an `AttributeError` at `spec.loader` before the assertion runs. Add an
assertion that `spec is not None` (or combine into a single `assert spec is not
None and spec.loader is not None`) before calling `module_from_spec`.
```suggestion
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
```
##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -396,18 +397,145 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
)
return
+ _migrate_deadline_alerts()
+
+
+def _backfill_deadline_alert_ids(
+ conn: Connection,
+ dag_first_alert_ids: dict[str, str],
+ batch_size: int,
+ dags_with_errors: ErrorDict,
+) -> int:
+ """Populate deadline.deadline_alert_id in primary-key order to avoid
dagrun_id table scans."""
+ if not dag_first_alert_ids:
+ log.info("No deadline alerts migrated; skipping deadline backfill")
+ return 0
+
+ last_deadline_id = "00000000-0000-0000-0000-000000000000"
+ updated_deadlines = 0
+ missing_alert_dag_ids: set[str] = set()
+
+ total_deadlines = conn.execute(
+ sa.text("SELECT COUNT(*) FROM deadline WHERE deadline_alert_id IS
NULL")
+ ).scalar()
+ total_batches = (total_deadlines + batch_size - 1) // batch_size
+ batch_num = 0
+
+ log.info(
+ "Starting deadline backfill",
+ batch_size=batch_size,
+ total_deadlines=total_deadlines,
+ total_batches=total_batches,
+ )
+
+ while True:
+ batch_num += 1
+ batch_rows = list(
+ conn.execute(
+ sa.text("""
+ SELECT d.id, dr.dag_id
+ FROM deadline d
+ JOIN dag_run dr ON dr.id = d.dagrun_id
+ WHERE d.deadline_alert_id IS NULL
+ AND d.id > :last_deadline_id
+ ORDER BY d.id
+ LIMIT :batch_size
+ """),
+ {"last_deadline_id": last_deadline_id, "batch_size":
batch_size},
+ )
+ )
+
+ if not batch_rows:
+ break
+
+ last_deadline_id = str(batch_rows[-1].id)
+ deadline_ids_by_alert: dict[str, list[Any]] = defaultdict(list)
Review Comment:
`deadline_ids_by_alert` is initialized as a `defaultdict(list)` but
annotated as a plain `dict[...]`, which is a type mismatch and can trip static
checking. Update the annotation to `defaultdict[str, list[...]]` (or change the
initializer to a plain dict + `setdefault`) and narrow `Any` to the actual
`deadline.id` type used here (often `str`/UUID).
##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -396,18 +397,145 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
)
return
+ _migrate_deadline_alerts()
+
+
+def _backfill_deadline_alert_ids(
+ conn: Connection,
+ dag_first_alert_ids: dict[str, str],
+ batch_size: int,
+ dags_with_errors: ErrorDict,
+) -> int:
+ """Populate deadline.deadline_alert_id in primary-key order to avoid
dagrun_id table scans."""
+ if not dag_first_alert_ids:
+ log.info("No deadline alerts migrated; skipping deadline backfill")
+ return 0
+
+ last_deadline_id = "00000000-0000-0000-0000-000000000000"
+ updated_deadlines = 0
+ missing_alert_dag_ids: set[str] = set()
+
+ total_deadlines = conn.execute(
+ sa.text("SELECT COUNT(*) FROM deadline WHERE deadline_alert_id IS
NULL")
+ ).scalar()
+ total_batches = (total_deadlines + batch_size - 1) // batch_size
+ batch_num = 0
+
+ log.info(
+ "Starting deadline backfill",
+ batch_size=batch_size,
+ total_deadlines=total_deadlines,
+ total_batches=total_batches,
+ )
+
+ while True:
+ batch_num += 1
+ batch_rows = list(
+ conn.execute(
+ sa.text("""
+ SELECT d.id, dr.dag_id
+ FROM deadline d
+ JOIN dag_run dr ON dr.id = d.dagrun_id
+ WHERE d.deadline_alert_id IS NULL
+ AND d.id > :last_deadline_id
+ ORDER BY d.id
+ LIMIT :batch_size
+ """),
+ {"last_deadline_id": last_deadline_id, "batch_size":
batch_size},
+ )
+ )
+
+ if not batch_rows:
+ break
+
+ last_deadline_id = str(batch_rows[-1].id)
+ deadline_ids_by_alert: dict[str, list[Any]] = defaultdict(list)
+
+ for deadline_id, dag_id in batch_rows:
+ alert_id = dag_first_alert_ids.get(dag_id)
+ if alert_id is None:
+ missing_alert_dag_ids.add(dag_id)
+ continue
+ deadline_ids_by_alert[alert_id].append(deadline_id)
+
+ with _begin_nested_transaction(conn) as batch_conn:
+ for alert_id, deadline_ids in deadline_ids_by_alert.items():
+ batch_conn.execute(
+ sa.text("""
+ UPDATE deadline
+ SET deadline_alert_id = :alert_id
+ WHERE deadline_alert_id IS NULL
+ AND id IN :deadline_ids
+ """).bindparams(sa.bindparam("deadline_ids",
expanding=True)),
+ {"alert_id": alert_id, "deadline_ids": deadline_ids},
+ )
+ updated_deadlines += len(deadline_ids)
+
+ log.info(
+ "Deadline backfill batch complete",
+ batch_num=batch_num,
+ total_batches=total_batches,
+ updated_deadlines=updated_deadlines,
+ )
+
+ for dag_id in sorted(missing_alert_dag_ids):
+ dags_with_errors[dag_id].append("Could not find migrated deadline
alert for historical deadlines")
+
+ return updated_deadlines
+
+
+def _migrate_deadline_alerts() -> None:
BATCH_SIZE = conf.getint("database", "migration_batch_size",
fallback=DEFAULT_BATCH_SIZE)
processed_dags: list[str] = []
dags_with_deadlines: set[str] = set()
migrated_alerts_count: int = 0
+ updated_deadlines_count: int = 0
dags_with_errors: ErrorDict = defaultdict(list)
+ dag_first_alert_ids: dict[str, str] = {}
batch_num = 0
- last_dag_id = ""
+ # Paginate by primary key (id) instead of dag_id because id is indexed (PK)
+ # while dag_id has no index — using dag_id would cause a full table scan +
sort
+ # on every batch. UUID7 ids are time-ordered so the pagination order is
stable.
+ last_id = "00000000-0000-0000-0000-000000000000"
conn = op.get_bind()
dialect = conn.dialect.name
+ # Build dialect-specific filter to skip rows without deadline data at the
SQL level.
+ # This avoids transferring and processing large data blobs for the
majority of DAGs
+ # that have no deadline configuration. Compressed rows are always included
since the
+ # DB cannot inspect their content.
+ if dialect == "postgresql":
+ deadline_filter = (
+ "AND ("
+ " data_compressed IS NOT NULL"
+ " OR (data IS NOT NULL"
+ " AND (data::jsonb -> 'dag' -> 'deadline') IS NOT NULL"
+ " AND (data::jsonb -> 'dag' -> 'deadline') != 'null'::jsonb"
+ " AND (data::jsonb -> 'dag' -> 'deadline') != '[]'::jsonb)"
+ ")"
+ )
+ elif dialect == "mysql":
+ deadline_filter = (
+ "AND ("
+ " data_compressed IS NOT NULL"
+ " OR (data IS NOT NULL"
+ " AND JSON_EXTRACT(data, '$.dag.deadline') IS NOT NULL"
+ " AND IFNULL(JSON_LENGTH(JSON_EXTRACT(data,
'$.dag.deadline')), 0) > 0)"
+ ")"
+ )
+ else:
+ deadline_filter = (
+ "AND ("
+ " data_compressed IS NOT NULL"
+ " OR (data IS NOT NULL"
+ " AND json_extract(data, '$.dag.deadline') IS NOT NULL"
+ " AND json_extract(data, '$.dag.deadline') != 'null'"
+ " AND json_extract(data, '$.dag.deadline') != '[]')"
+ ")"
+ )
+
total_dags = conn.execute(
sa.text("SELECT COUNT(*) FROM serialized_dag WHERE data IS NOT NULL OR
data_compressed IS NOT NULL")
Review Comment:
Progress logging (`total_dags`/`total_batches`) no longer reflects the
actual rows processed because the main batching queries now apply
`deadline_filter`. Consider updating this COUNT query to include the same
`deadline_filter` logic so batch counts and ETA/progress logs are accurate and
less confusing operationally.
```suggestion
sa.text(f"SELECT COUNT(*) FROM serialized_dag WHERE 1=1
{deadline_filter}")
```
##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -396,18 +397,145 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
)
return
+ _migrate_deadline_alerts()
+
+
+def _backfill_deadline_alert_ids(
+ conn: Connection,
+ dag_first_alert_ids: dict[str, str],
+ batch_size: int,
+ dags_with_errors: ErrorDict,
+) -> int:
+ """Populate deadline.deadline_alert_id in primary-key order to avoid
dagrun_id table scans."""
Review Comment:
`_backfill_deadline_alert_ids` introduces substantial new behavior (batched
PK pagination, per-alert grouping updates, and missing-alert error reporting)
but isn’t covered by the added migration unit tests (which currently focus on
`temporary_index`). Add unit tests that validate: (1) batched selection
advances by `deadline.id`, (2) deadlines are updated for dag_ids present in
`dag_first_alert_ids`, and (3) dag_ids missing from the mapping are reported
into `dags_with_errors`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]