This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4869b941fd7 fix: replace savepoint-per-DAG with per-DAG transaction in
migration … (#63591)
4869b941fd7 is described below
commit 4869b941fd7fd1bc1fda8d7e45ac214bc37f34a3
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Thu Mar 19 14:21:01 2026 +0530
fix: replace savepoint-per-DAG with per-DAG transaction in migration …
(#63591)
* fix: replace savepoint-per-DAG with per-DAG transaction in migration 0101
downgrade to prevent PostgreSQL lock table exhaustion
* fix: address review feedback - fix upgrade path, move validation before
transaction, restore error prefix, handle SQLite separately
* Simplify duplicate engine-specific session handling
* fix: yield new connection from _begin_nested_transaction and update
callers to use it
* Remove sending a flag to rollback
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../0101_3_2_0_ui_improvements_for_deadlines.py | 319 +++++++++++----------
1 file changed, 168 insertions(+), 151 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
index 7bd50818105..0233f48c9a9 100644
---
a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
+++
b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py
@@ -30,6 +30,7 @@ Create Date: 2025-10-17 16:04:55.016272
from __future__ import annotations
+import contextlib
import json
import zlib
from collections import defaultdict
@@ -363,6 +364,28 @@ def _sort_serialized_dag_dict(serialized_dag: Any):
return serialized_dag
[email protected]
+def _begin_nested_transaction(conn):
+ """
+ Create a nested transaction.
+
+ On SQLite, uses ``conn.begin_nested()`` with commit/rollback.
+ On other backends, opens a new connection via ``conn.engine.begin()``
+ and yields it so callers use the new connection for writes.
+ """
+ if conn.dialect.name != "sqlite":
+ with conn.engine.begin() as new_conn:
+ yield new_conn
+ return
+ try:
+ savepoint = conn.begin_nested()
+ yield conn
+ except Exception:
+ savepoint.rollback()
+ raise
+ savepoint.commit()
+
+
def migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
"""Extract DeadlineAlert data from serialized Dag data and populate
deadline_alert table."""
if context.is_offline_mode():
@@ -411,8 +434,7 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
"""),
{"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
)
-
- batch_results = sorted(list(result), key=lambda r: r.dag_id)
+ batch_results = sorted(result, key=lambda r: r.dag_id)
else:
result = conn.execute(
sa.text("""
@@ -425,7 +447,6 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
"""),
{"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
)
-
batch_results = list(result)
if not batch_results:
break
@@ -436,100 +457,98 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
processed_dags.append(dag_id)
last_dag_id = dag_id
- # Create a savepoint for this Dag to allow rollback on error.
- savepoint = conn.begin_nested()
-
+ # Validation that does not need a DB connection.
try:
- dag_data = get_dag_data(data, data_compressed)
-
- if dag_deadline := dag_data[DAG_KEY][DEADLINE_KEY]:
- dags_with_deadlines.add(dag_id)
- deadline_alerts = dag_deadline if isinstance(dag_deadline,
list) else [dag_deadline]
-
- migrated_alert_ids = []
-
- for serialized_alert in deadline_alerts:
- if isinstance(serialized_alert, dict):
- try:
- alert_data =
serialized_alert.get(Encoding.VAR, serialized_alert)
-
- if not
DEADLINE_ALERT_REQUIRED_FIELDS.issubset(alert_data):
- dags_with_errors[dag_id].append(
- f"Invalid DeadlineAlert structure:
{serialized_alert}"
- )
- continue
-
- reference_data =
json.dumps(alert_data[REFERENCE_KEY], sort_keys=True)
- interval_data =
float(alert_data.get(INTERVAL_KEY))
- callback_data =
json.dumps(alert_data[CALLBACK_KEY], sort_keys=True)
- deadline_alert_id = str(uuid6.uuid7())
-
- conn.execute(
- sa.text("""
- INSERT INTO deadline_alert (
- id,
- created_at,
- serialized_dag_id,
- reference,
- interval,
- callback_def,
- name,
- description)
- VALUES (
- :id,
- :created_at,
- :serialized_dag_id,
- :reference,
- :interval,
- :callback_def,
- NULL,
- NULL)
- """),
- {
- "id": deadline_alert_id,
- "created_at": created_at or
timezone.utcnow(),
- "serialized_dag_id": serialized_dag_id,
- "reference": reference_data,
- "interval": interval_data,
- "callback_def": callback_data,
- },
- )
-
- if not validate_written_data(
- conn, deadline_alert_id, reference_data,
interval_data, callback_data
- ):
- dags_with_errors[dag_id].append(
- f"Invalid DeadlineAlert data:
{serialized_alert}"
- )
- continue
-
- migrated_alert_ids.append(deadline_alert_id)
- migrated_alerts_count += 1
-
- conn.execute(
- sa.text("""
- UPDATE deadline
- SET deadline_alert_id = :alert_id
- WHERE dagrun_id IN (
- SELECT dr.id
- FROM dag_run dr
- JOIN serialized_dag sd ON
dr.dag_id = sd.dag_id
- WHERE sd.id = :serialized_dag_id)
- AND deadline_alert_id IS NULL
+ dag_deadline = get_dag_data(data,
data_compressed)[DAG_KEY].get(DEADLINE_KEY)
+ except (json.JSONDecodeError, KeyError, TypeError) as e:
+ dags_with_errors[dag_id].append(f"Could not process serialized
Dag: {e}")
+ continue
+ if not dag_deadline:
+ continue
+
+ dags_with_deadlines.add(dag_id)
+ deadline_alerts = dag_deadline if isinstance(dag_deadline, list)
else [dag_deadline]
+
+ def _migrate_dag_deadlines(dag_conn: Connection) -> Iterable[str]:
+ for serialized_alert in deadline_alerts:
+ if not isinstance(serialized_alert, dict):
+ continue
+ try:
+ alert_data = serialized_alert.get(Encoding.VAR,
serialized_alert)
+
+ if not
DEADLINE_ALERT_REQUIRED_FIELDS.issubset(alert_data):
+ dags_with_errors[dag_id].append(
+ f"Invalid DeadlineAlert structure:
{serialized_alert}"
+ )
+ continue
+
+ reference_data = json.dumps(alert_data[REFERENCE_KEY],
sort_keys=True)
+ interval_data = float(alert_data.get(INTERVAL_KEY))
+ callback_data = json.dumps(alert_data[CALLBACK_KEY],
sort_keys=True)
+ deadline_alert_id = str(uuid6.uuid7())
+
+ dag_conn.execute(
+ sa.text("""
+ INSERT INTO deadline_alert (
+ id,
+ created_at,
+ serialized_dag_id,
+ reference,
+ interval,
+ callback_def,
+ name,
+ description)
+ VALUES (
+ :id,
+ :created_at,
+ :serialized_dag_id,
+ :reference,
+ :interval,
+ :callback_def,
+ NULL,
+ NULL)
"""),
- {"alert_id": deadline_alert_id,
"serialized_dag_id": serialized_dag_id},
- )
- except Exception as e:
- dags_with_errors[dag_id].append(f"Failed to
process {serialized_alert}: {e}")
- continue
+ {
+ "id": deadline_alert_id,
+ "created_at": created_at or timezone.utcnow(),
+ "serialized_dag_id": serialized_dag_id,
+ "reference": reference_data,
+ "interval": interval_data,
+ "callback_def": callback_data,
+ },
+ )
+
+ if not validate_written_data(
+ dag_conn, deadline_alert_id, reference_data,
interval_data, callback_data
+ ):
+ dags_with_errors[dag_id].append(f"Invalid
DeadlineAlert data: {serialized_alert}")
+ continue
+
+ yield deadline_alert_id
+ dag_conn.execute(
+ sa.text("""
+ UPDATE deadline
+ SET deadline_alert_id = :alert_id
+ WHERE dagrun_id IN (
+ SELECT dr.id
+ FROM dag_run dr
+ JOIN serialized_dag sd ON
dr.dag_id = sd.dag_id
+ WHERE sd.id = :serialized_dag_id)
+ AND deadline_alert_id IS NULL
+ """),
+ {"alert_id": deadline_alert_id,
"serialized_dag_id": serialized_dag_id},
+ )
+ except Exception as e:
+ dags_with_errors[dag_id].append(f"Failed to process
{serialized_alert}: {e}")
+ continue
+ try:
+ with _begin_nested_transaction(conn) as dag_conn:
+ migrated_alert_ids = list(_migrate_dag_deadlines(dag_conn))
if migrated_alert_ids:
uuid_strings = [str(uuid_id) for uuid_id in
migrated_alert_ids]
- update_dag_deadline_field(conn, serialized_dag_id,
uuid_strings, dialect)
-
- # Recalculate and update the dag_hash after modifying
the deadline data to ensure
- # it matches what write_dag() will compute later and
avoid re-serialization.
- updated_result = conn.execute(
+ update_dag_deadline_field(dag_conn, serialized_dag_id,
uuid_strings, dialect)
+ updated_result = dag_conn.execute(
sa.text(
"SELECT data, data_compressed "
"FROM serialized_dag "
@@ -537,15 +556,12 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
),
{"serialized_dag_id": serialized_dag_id},
).fetchone()
-
if updated_result:
updated_dag_data = get_dag_data(
updated_result.data,
updated_result.data_compressed
)
- # Import here to avoid a circular dependency issue
new_hash = hash_dag(updated_dag_data)
-
- conn.execute(
+ dag_conn.execute(
sa.text(
"UPDATE serialized_dag "
"SET dag_hash = :new_hash "
@@ -553,13 +569,10 @@ def
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
),
{"new_hash": new_hash, "serialized_dag_id":
serialized_dag_id},
)
-
- # Commit the savepoint if everything succeeded for this Dag.
- savepoint.commit()
-
+ migrated_alerts_count += len(migrated_alert_ids)
except (json.JSONDecodeError, KeyError, TypeError) as e:
- dags_with_errors[dag_id].append(f"Could not process serialized
Dag: {e}")
- savepoint.rollback()
+ log.exception("Could not migrate deadline for dag %s", dag_id)
+ dags_with_errors[dag_id].append(f"Could not migrate deadline:
{e}")
log.info("Batch complete", batch_num=batch_num,
total_batches=total_batches)
@@ -652,60 +665,64 @@ def migrate_deadline_alert_data_back_to_serialized_dag()
-> None:
processed_dags.append(dag_id)
last_dag_id = dag_id
- # Create a savepoint for this Dag to allow rollback on error.
- savepoint = conn.begin_nested()
-
+ # Validation that does not need a DB connection.
try:
dag_data = get_dag_data(data, data_compressed)
- deadline_uuids = dag_data[DAG_KEY][DEADLINE_KEY]
-
- if not isinstance(deadline_uuids, list) or not deadline_uuids:
- continue
-
- if not all(isinstance(uuid_val, str) for uuid_val in
deadline_uuids):
- log.warning("Dag has non-string deadline values,
skipping", dag_id=dag_id)
- continue
-
- dags_with_deadlines.add(dag_id)
- restored_deadline_objects = []
-
- alert_result = conn.execute(
- sa.select(
- deadline_alert_table.c.reference,
- deadline_alert_table.c.interval,
- deadline_alert_table.c.callback_def,
- ).where(deadline_alert_table.c.serialized_dag_id ==
sa.bindparam("serialized_dag_id")),
- {"serialized_dag_id": serialized_dag_id},
- ).fetchall()
-
- if not alert_result:
- dags_with_errors[dag_id].append(
- f"Could not find deadline_alert for serialized_dag
{serialized_dag_id}"
- )
- continue
-
- for alert in alert_result:
- deadline_object = {
- Encoding.TYPE: ENCODING_TYPE,
- Encoding.VAR: {
- REFERENCE_KEY: alert.reference,
- INTERVAL_KEY: float(alert.interval),
- CALLBACK_KEY: alert.callback_def,
- },
- }
- restored_deadline_objects.append(deadline_object)
- restored_alerts_count += 1
-
- # Replace the UUID array with the restored objects.
- if restored_deadline_objects:
- update_dag_deadline_field(conn, serialized_dag_id,
restored_deadline_objects, dialect)
-
- # Commit the savepoint if everything succeeded for this Dag.
- savepoint.commit()
+ except (json.JSONDecodeError, KeyError, TypeError):
+ continue
+ deadline_uuids = (
+ dag_data.get(DAG_KEY, {}).get(DEADLINE_KEY)
+ if isinstance(dag_data.get(DAG_KEY), dict)
+ else None
+ )
+
+ if not isinstance(deadline_uuids, list) or not deadline_uuids:
+ continue
+ if not all(isinstance(uuid_val, str) for uuid_val in
deadline_uuids):
+ log.warning("Dag has non-string deadline values, skipping",
dag_id=dag_id)
+ continue
+
+ dags_with_deadlines.add(dag_id)
+
+ try:
+ with _begin_nested_transaction(conn) as dag_conn:
+ alert_result = dag_conn.execute(
+ sa.select(
+ deadline_alert_table.c.reference,
+ deadline_alert_table.c.interval,
+ deadline_alert_table.c.callback_def,
+ ).where(
+ deadline_alert_table.c.serialized_dag_id ==
sa.bindparam("serialized_dag_id")
+ ),
+ {"serialized_dag_id": serialized_dag_id},
+ ).fetchall()
+
+ if not alert_result:
+ dags_with_errors[dag_id].append(
+ f"Could not find deadline_alert for serialized_dag
{serialized_dag_id}"
+ )
+ continue
+
+ restored_deadline_objects = []
+ for alert in alert_result:
+ deadline_object = {
+ Encoding.TYPE: ENCODING_TYPE,
+ Encoding.VAR: {
+ REFERENCE_KEY: alert.reference,
+ INTERVAL_KEY: float(alert.interval),
+ CALLBACK_KEY: alert.callback_def,
+ },
+ }
+ restored_deadline_objects.append(deadline_object)
+ restored_alerts_count += 1
+ if restored_deadline_objects:
+ update_dag_deadline_field(
+ dag_conn, serialized_dag_id,
restored_deadline_objects, dialect
+ )
except Exception as e:
+ log.exception("Could not restore deadline for dag %s", dag_id)
dags_with_errors[dag_id].append(f"Could not restore deadline:
{e}")
- savepoint.rollback()
log.info("Batch complete", batch_num=batch_num,
total_batches=total_batches)