ephraimbuddy commented on code in PR #63628:
URL: https://github.com/apache/airflow/pull/63628#discussion_r2958985826
##########
airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py:
##########
@@ -122,56 +63,159 @@ def migrate_all_data():
op.execute("DELETE FROM deadline")
return
- deadline_table = table(
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if dialect == "postgresql":
+ # PostgreSQL: use gen_random_uuid() and jsonb operations to avoid
Python
+ # deserialization. The callback JSON is serde-wrapped:
+ # {"__data__": {"path": "...", "kwargs": {...}},
"__classname__": "...", ...}
+ # We extract __data__ fields and merge in prefix + dag_id.
+ # A writable CTE handles both the INSERT into callback and the
UPDATE of
+ # deadline in a single statement, so the generated UUID is shared.
+ conn.execute(
+ sa.text("""
+ WITH new_callbacks AS (
+ SELECT
+ d.id AS deadline_id,
+ gen_random_uuid() AS callback_id,
+ jsonb_build_object(
+ 'path', d.callback->'__data__'->>'path',
+ 'kwargs', d.callback->'__data__'->'kwargs',
+ 'prefix', :prefix,
+ 'dag_id', dr.dag_id
+ )::json AS callback_data,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN d.callback_state
+ ELSE :pending
+ END AS cb_state,
+ CASE
+ WHEN d.callback_state IN ('failed', 'success')
THEN true
+ ELSE false
+ END AS is_missed
+ FROM deadline d
+ JOIN dag_run dr ON d.dagrun_id = dr.id
+ WHERE d.callback_id IS NULL
+ ),
+ inserted AS (
+ INSERT INTO callback (id, type, fetch_method, data,
state, priority_weight, created_at)
+ SELECT
+ callback_id, :cb_type, :fetch_method,
callback_data, cb_state, 1, NOW()
+ FROM new_callbacks
+ )
+ UPDATE deadline
+ SET callback_id = nc.callback_id, missed = nc.is_missed
+ FROM new_callbacks nc
+ WHERE deadline.id = nc.deadline_id
+ """),
+ {
+ "cb_type": _CALLBACK_TYPE_TRIGGERER,
+ "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+ "prefix": _CALLBACK_METRICS_PREFIX,
+ "pending": _CALLBACK_STATE_PENDING,
+ },
+ )
+ else:
+ # MySQL / SQLite: use batched Python approach with SQL JSON
extraction.
+ # UUID generation requires Python (no reliable cross-dialect SQL
UUID function
+ # that matches SQLAlchemy's Uuid column type).
+ _migrate_batched(conn, dialect)
+
+ def _migrate_batched(conn, dialect):
+ """Batch migration for MySQL/SQLite using Python UUIDs with SQL JSON
extraction."""
+ import json
+
+ import uuid6
+
+ from airflow.utils.sqlalchemy import UtcDateTime
+
+ deadline_table = sa.table(
"deadline",
- column("id", sa.Uuid()),
- column("dagrun_id", sa.Integer()),
- column("deadline_time", UtcDateTime(timezone=True)),
- column("callback", sa.JSON()),
- column("callback_state", sa.String(20)),
- column("missed", sa.Boolean()),
- column("callback_id", sa.Uuid()),
+ sa.column("id", sa.Uuid()),
+ sa.column("dagrun_id", sa.Integer()),
+ sa.column("callback", sa.JSON()),
+ sa.column("callback_state", sa.String(20)),
+ sa.column("missed", sa.Boolean()),
+ sa.column("callback_id", sa.Uuid()),
)
- dag_run_table = table(
+ dag_run_table = sa.table(
"dag_run",
- column("id", sa.Integer()),
- column("dag_id", StringID()),
+ sa.column("id", sa.Integer()),
+ sa.column("dag_id", sa.String()),
)
- callback_table = table(
+ callback_table = sa.table(
"callback",
- column("id", sa.Uuid()),
- column("type", sa.String(20)),
- column("fetch_method", sa.String(20)),
- column("data", ExtendedJSON()),
- column("state", sa.String(10)),
- column("priority_weight", sa.Integer()),
- column("created_at", UtcDateTime(timezone=True)),
+ sa.column("id", sa.Uuid()),
+ sa.column("type", sa.String(20)),
+ sa.column("fetch_method", sa.String(20)),
+ sa.column("data", sa.JSON()),
Review Comment:
How about airflow using the data? Won't airflow try to call deserialize on
the data when running? i feel migrated rows would become unreadable on normal
ORM access and on downgrade.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]