YoannAbriel commented on code in PR #63628:
URL: https://github.com/apache/airflow/pull/63628#discussion_r2959007685
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
op.execute("DELETE FROM deadline")
return
- deadline_table = table(
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if dialect == "postgresql":
+ # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid
Python
+ # deserialization. The callback JSON is serde-wrapped:
+ # {"__data__": {"path": "...", "kwargs": {...}},
"__classname__": "...", ...}
+ # We extract __data__ fields and merge in prefix + dag_id.
+ # A writable CTE handles both the INSERT into callback and the
UPDATE of
+ # deadline in a single statement, so the generated UUID is shared.
+ conn.execute(
+ sa.text("""
+ WITH new_callbacks AS (
+ SELECT
+ d.id AS deadline_id,
+ gen_random_uuid() AS callback_id,
+ jsonb_build_object(
+ 'path', d.callback->'__data__'->>'path',
+ 'kwargs', d.callback->'__data__'->'kwargs',
+ 'prefix', :prefix,
+ 'dag_id', dr.dag_id
+ )::json AS callback_data,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN d.callback_state
+ ELSE :pending
+ END AS cb_state,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN true
+ ELSE false
+ END AS is_missed
+ FROM deadline d
+ JOIN dag_run dr ON d.dagrun_id = dr.id
+ WHERE d.callback_id IS NULL
+ ),
+ inserted AS (
+ INSERT INTO callback (id, type, fetch_method, data,
state, priority_weight, created_at)
+ SELECT
+ callback_id, :cb_type, :fetch_method,
callback_data, cb_state, 1, NOW()
+ FROM new_callbacks
+ )
+ UPDATE deadline
+ SET callback_id = nc.callback_id, missed = nc.is_missed
+ FROM new_callbacks nc
+ WHERE deadline.id = nc.deadline_id
+ """),
+ {
+ "cb_type": _CALLBACK_TYPE_TRIGGERER,
+ "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+ "prefix": _CALLBACK_METRICS_PREFIX,
+ "pending": _CALLBACK_STATE_PENDING,
+ },
+ )
+ else:
+ # MySQL / SQLite: use batched Python approach with SQL JSON
extraction.
+ # UUID generation requires Python (no reliable cross-dialect SQL
UUID function
+ # that matches SQLAlchemy's Uuid column type).
+ _migrate_batched(conn, dialect)
+
+ def _migrate_batched(conn, dialect):
+ """Batch migration for MySQL/SQLite using Python UUIDs with SQL JSON
extraction."""
+ import json
+
+ import uuid6
+
+ from airflow.utils.sqlalchemy import UtcDateTime
+
+ deadline_table = sa.table(
"deadline",
- column("id", sa.Uuid()),
- column("dagrun_id", sa.Integer()),
- column("deadline_time", UtcDateTime(timezone=True)),
- column("callback", sa.JSON()),
- column("callback_state", sa.String(20)),
- column("missed", sa.Boolean()),
- column("callback_id", sa.Uuid()),
+ sa.column("id", sa.Uuid()),
+ sa.column("dagrun_id", sa.Integer()),
+ sa.column("callback", sa.JSON()),
+ sa.column("callback_state", sa.String(20)),
+ sa.column("missed", sa.Boolean()),
+ sa.column("callback_id", sa.Uuid()),
)
- dag_run_table = table(
+ dag_run_table = sa.table(
"dag_run",
- column("id", sa.Integer()),
- column("dag_id", StringID()),
+ sa.column("id", sa.Integer()),
+ sa.column("dag_id", sa.String()),
)
- callback_table = table(
+ callback_table = sa.table(
"callback",
- column("id", sa.Uuid()),
- column("type", sa.String(20)),
- column("fetch_method", sa.String(20)),
- column("data", ExtendedJSON()),
- column("state", sa.String(10)),
- column("priority_weight", sa.Integer()),
- column("created_at", UtcDateTime(timezone=True)),
+ sa.column("id", sa.Uuid()),
+ sa.column("type", sa.String(20)),
+ sa.column("fetch_method", sa.String(20)),
+ sa.column("data", sa.JSON()),
Review Comment:
Valid point — callback.data uses ExtendedJSON, so raw-copying serde kwargs
breaks complex types at runtime. Will add proper deserialization before insert.
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -136,42 +156,79 @@ def migrate_all_data():
dag_run_table = table(
"dag_run",
column("id", sa.Integer()),
- column("dag_id", StringID()),
+ column("dag_id", sa.String()),
)
callback_table = table(
"callback",
column("id", sa.Uuid()),
column("type", sa.String(20)),
column("fetch_method", sa.String(20)),
- column("data", ExtendedJSON()),
+ column("data", sa.JSON()),
column("state", sa.String(10)),
column("priority_weight", sa.Integer()),
column("created_at", UtcDateTime(timezone=True)),
)
- conn = op.get_bind()
+ from datetime import datetime, timezone
+
+ timestamp = datetime.now(timezone.utc)
batch_num = 0
+
while True:
batch_num += 1
batch = conn.execute(
select(
deadline_table.c.id,
- deadline_table.c.dagrun_id,
- deadline_table.c.deadline_time,
deadline_table.c.callback,
deadline_table.c.callback_state,
dag_run_table.c.dag_id,
)
.join(dag_run_table, deadline_table.c.dagrun_id ==
dag_run_table.c.id)
- .where(deadline_table.c.callback_id.is_(None)) # Only get
rows that haven't been migrated yet
+ .where(deadline_table.c.callback_id.is_(None))
.limit(BATCH_SIZE)
).fetchall()
if not batch:
break
- migrate_batch(conn, deadline_table, callback_table, batch)
+ callback_inserts = []
+ deadline_updates = []
+
+ for row in batch:
+ callback_id = uuid6.uuid7()
+ cb = row.callback if isinstance(row.callback, dict) else
json.loads(row.callback)
Review Comment:
Agreed, will deserialize kwargs through serde before re-inserting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]