This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b57ae490002 fix: optimize migration 0094 upgrade to use SQL instead of
Python deserialization (#63628)
b57ae490002 is described below
commit b57ae49000263282812377a2c15d4d5c4123922d
Author: Yoann <[email protected]>
AuthorDate: Mon Mar 23 01:07:01 2026 -0700
fix: optimize migration 0094 upgrade to use SQL instead of Python
deserialization (#63628)
* fix: optimize migration 0094 upgrade to use SQL instead of Python
deserialization
Replace row-by-row Python serde deserialization with pure SQL JSON
manipulation for PostgreSQL (writable CTE with gen_random_uuid()
and jsonb operations). For MySQL/SQLite, use batched approach with
Python UUID generation but direct JSON dict access instead of
importing and invoking serde.deserialize().
This eliminates the Python object instantiation bottleneck that
caused ~33 minute migration times for 10M deadline rows.
Also removes runtime module imports (airflow.serialization.serde,
airflow.models.callback, airflow.models.deadline) from the upgrade
path, hardcoding the constant values instead. This follows the
migration best practice of avoiding ORM/runtime imports.
Closes: #63532
* fix: remove extra blank line in migration downgrade function
* fix: address review — restore column/select/table imports, batch PG CTE,
BATCH_SIZE=5000
- Restore column, select, table imports instead of sa.* (amoghrajesh)
- Keep BATCH_SIZE constant, increase to 5000 (amoghrajesh)
- Move json import to top level (amoghrajesh)
- Batch PostgreSQL CTE with LIMIT to cap memory usage (amoghrajesh)
- Use plain JSON for upgrade data column (no deserialization needed)
* Move datetime import to top level
* Also avoid ExtendedJSON in downgrade to avoid decoding
* Simplify imports
* fix: refine 0094 deadline callback migration
---------
Co-authored-by: Amogh Desai <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: vatsrahul1001 <[email protected]>
---
...0_replace_deadline_inline_callback_with_fkey.py | 594 +++++++++++++--------
1 file changed, 375 insertions(+), 219 deletions(-)
diff --git
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
index 8c9eddec2cb..06d0f7b43ab 100644
---
a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
+++
b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py
@@ -34,8 +34,8 @@ import sqlalchemy as sa
from alembic import context, op
from sqlalchemy import column, select, table
-from airflow.serialization.serde import deserialize
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.configuration import conf
+from airflow.utils.sqlalchemy import UtcDateTime
# revision identifiers, used by Alembic.
revision = "e812941398f4"
@@ -44,62 +44,189 @@ branch_labels = None
depends_on = None
airflow_version = "3.2.0"
-BATCH_SIZE = 1000
+_CALLBACK_TYPE_TRIGGERER = "triggerer"
+_CALLBACK_FETCH_METHOD_IMPORT_PATH = "import_path"
+_CALLBACK_METRICS_PREFIX = "deadline_alerts"
+_CALLBACK_STATE_PENDING = "pending"
+_CALLBACK_STATE_SUCCESS = "success"
+_CALLBACK_STATE_FAILED = "failed"
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
-def upgrade():
- """Replace Deadline table's inline callback fields with callback_id
foreign key."""
- import uuid6
- from airflow.models.base import StringID
- from airflow.models.callback import CallbackFetchMethod, CallbackState,
CallbackType
- from airflow.models.deadline import CALLBACK_METRICS_PREFIX
+def _upgrade_postgresql(conn, batch_size):
+ """Writable CTE per batch: SELECT window → INSERT callback → UPDATE
deadline in one round-trip."""
+ timestamp = datetime.now(timezone.utc)
+ batch_num = 0
+ last_id = "00000000-0000-0000-0000-000000000000"
+
+ while True:
+ batch_num += 1
+ result = conn.execute(
+ sa.text("""
+ WITH batch AS (
+ SELECT
+ d.id AS deadline_id,
+ gen_random_uuid() AS callback_id,
+ COALESCE(dr.dag_id, '') AS dag_id,
+ d.callback::jsonb->'__data__'->>'path' AS cb_path,
+ d.callback::jsonb->'__data__'->'kwargs' AS cb_kwargs,
+ CASE
+ WHEN d.callback_state IN (:state_success,
:state_failed) THEN d.callback_state
+ ELSE :state_pending
+ END AS cb_state,
+ COALESCE(d.callback_state IN (:state_success,
:state_failed), FALSE) AS missed
+ FROM deadline d
+ LEFT JOIN dag_run dr ON d.dagrun_id = dr.id
+ WHERE d.id > :last_id
+ ORDER BY d.id
+ LIMIT :batch_size
+ ),
+ ins AS (
+ INSERT INTO callback (id, type, fetch_method, data, state,
priority_weight, created_at)
+ SELECT
+ b.callback_id,
+ :cb_type,
+ :fetch_method,
+ json_build_object(
+ '__var', json_build_object(
+ 'path', b.cb_path,
+ 'kwargs', b.cb_kwargs,
+ 'prefix', :prefix,
+ 'dag_id', b.dag_id
+ ),
+ '__type', 'dict'
+ )::json,
+ b.cb_state,
+ 1,
+ :ts
+ FROM batch b
+ )
+ UPDATE deadline d
+ SET callback_id = b.callback_id,
+ missed = b.missed
+ FROM batch b
+ WHERE d.id = b.deadline_id
+ RETURNING d.id
+ """),
+ {
+ "last_id": last_id,
+ "batch_size": batch_size,
+ "state_success": _CALLBACK_STATE_SUCCESS,
+ "state_failed": _CALLBACK_STATE_FAILED,
+ "state_pending": _CALLBACK_STATE_PENDING,
+ "cb_type": _CALLBACK_TYPE_TRIGGERER,
+ "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+ "prefix": _CALLBACK_METRICS_PREFIX,
+ "ts": timestamp,
+ },
+ )
+
+ rows = result.fetchall()
+ row_count = len(rows)
+ if row_count == 0:
+ break
+
+ last_id = str(max(r[0] for r in rows))
+ print(f"Migrated {row_count} deadline records in batch {batch_num}")
+
+ if row_count < batch_size:
+ break
+
+ remaining = conn.execute(
+ sa.text("SELECT COUNT(*) FROM deadline WHERE missed IS NULL OR
callback_id IS NULL")
+ ).scalar()
+ if remaining:
+ print(f"WARNING: {remaining} deadline rows still have NULL
missed/callback_id, fixing...")
+ conn.execute(sa.text("UPDATE deadline SET missed = FALSE WHERE missed
IS NULL"))
+
+
+def _upgrade_mysql_sqlite(conn, batch_size):
+ """Batched upgrade for MySQL/SQLite with Python UUID generation."""
+ import json
+
+ import uuid6
timestamp = datetime.now(timezone.utc)
- def migrate_batch(conn, deadline_table, callback_table, batch):
+ deadline_table = table(
+ "deadline",
+ column("id", sa.Uuid()),
+ column("dagrun_id", sa.Integer()),
+ column("callback", sa.JSON()),
+ column("callback_state", sa.String(20)),
+ column("missed", sa.Boolean()),
+ column("callback_id", sa.Uuid()),
+ )
+
+ dag_run_table = table(
+ "dag_run",
+ column("id", sa.Integer()),
+ column("dag_id", sa.String(250)),
+ )
+
+ callback_table = table(
+ "callback",
+ column("id", sa.Uuid()),
+ column("type", sa.String(20)),
+ column("fetch_method", sa.String(20)),
+ column("data", sa.JSON()),
+ column("state", sa.String(10)),
+ column("priority_weight", sa.Integer()),
+ column("created_at", UtcDateTime(timezone=True)),
+ )
+
+ batch_num = 0
+ while True:
+ batch_num += 1
+ batch = conn.execute(
+ select(
+ deadline_table.c.id,
+ deadline_table.c.callback,
+ deadline_table.c.callback_state,
+ dag_run_table.c.dag_id,
+ )
+ .outerjoin(dag_run_table, deadline_table.c.dagrun_id ==
dag_run_table.c.id)
+ .where(deadline_table.c.callback_id.is_(None))
+ .limit(batch_size)
+ ).fetchall()
+
+ if not batch:
+ break
+
callback_inserts = []
deadline_updates = []
- for deadline in batch:
- try:
- callback_id = uuid6.uuid7()
-
- # Transform serialized callback to the new representation
- callback_data = deserialize(deadline.callback).serialize() | {
- "prefix": CALLBACK_METRICS_PREFIX,
- "dag_id": deadline.dag_id,
+ for row in batch:
+ callback_id = uuid6.uuid7()
+ cb = row.callback if isinstance(row.callback, dict) else
json.loads(row.callback)
+ cb_inner = cb.get("__data__", cb)
+ cb_data = {
+ "path": cb_inner.get("path", ""),
+ "kwargs": cb_inner.get("kwargs", {}),
+ "prefix": _CALLBACK_METRICS_PREFIX,
+ "dag_id": row.dag_id or "",
+ }
+
+ if row.callback_state in (_CALLBACK_STATE_SUCCESS,
_CALLBACK_STATE_FAILED):
+ missed = True
+ cb_state = row.callback_state
+ else:
+ missed = False
+ cb_state = _CALLBACK_STATE_PENDING
+
+ callback_inserts.append(
+ {
+ "id": callback_id,
+ "type": _CALLBACK_TYPE_TRIGGERER,
+ "fetch_method": _CALLBACK_FETCH_METHOD_IMPORT_PATH,
+ "data": cb_data,
+ "state": cb_state,
+ "priority_weight": 1,
+ "created_at": timestamp,
}
-
- if deadline.callback_state and deadline.callback_state in {
- CallbackState.FAILED,
- CallbackState.SUCCESS,
- }:
- deadline_missed = True
- callback_state = deadline.callback_state
- else:
- # Mark the deadlines in non-terminal states as not missed
so the scheduler handles them
- deadline_missed = False
- callback_state = CallbackState.PENDING
-
- callback_inserts.append(
- {
- "id": callback_id,
- "type": CallbackType.TRIGGERER, # Past versions only
support triggerer callbacks
- "fetch_method": CallbackFetchMethod.IMPORT_PATH, #
Past versions only support import_path
- "data": callback_data,
- "state": callback_state,
- "priority_weight": 1, # Default priority weight
- "created_at": timestamp,
- }
- )
-
- deadline_updates.append(
- {"deadline_id": deadline.id, "callback_id": callback_id,
"missed": deadline_missed}
- )
- except Exception:
- print(f"Failed to migrate deadline: {deadline}")
- raise
+ )
+ deadline_updates.append({"deadline_id": row.id, "callback_id":
callback_id, "missed": missed})
conn.execute(callback_table.insert(), callback_inserts)
conn.execute(
@@ -108,81 +235,34 @@ def upgrade():
.values(callback_id=sa.bindparam("callback_id"),
missed=sa.bindparam("missed")),
deadline_updates,
)
+ print(f"Migrated {len(batch)} deadline records in batch {batch_num}")
- def migrate_all_data():
- if context.is_offline_mode():
- print(
- dedent("""
- ------------
- -- WARNING: Unable to migrate the data in the deadline table
while in offline mode!
- -- All the rows in the deadline table will be deleted in this
mode.
- ------------
- """)
- )
- op.execute("DELETE FROM deadline")
- return
-
- deadline_table = table(
- "deadline",
- column("id", sa.Uuid()),
- column("dagrun_id", sa.Integer()),
- column("deadline_time", UtcDateTime(timezone=True)),
- column("callback", sa.JSON()),
- column("callback_state", sa.String(20)),
- column("missed", sa.Boolean()),
- column("callback_id", sa.Uuid()),
- )
-
- dag_run_table = table(
- "dag_run",
- column("id", sa.Integer()),
- column("dag_id", StringID()),
- )
-
- callback_table = table(
- "callback",
- column("id", sa.Uuid()),
- column("type", sa.String(20)),
- column("fetch_method", sa.String(20)),
- column("data", ExtendedJSON()),
- column("state", sa.String(10)),
- column("priority_weight", sa.Integer()),
- column("created_at", UtcDateTime(timezone=True)),
- )
-
- conn = op.get_bind()
- batch_num = 0
- while True:
- batch_num += 1
- batch = conn.execute(
- select(
- deadline_table.c.id,
- deadline_table.c.dagrun_id,
- deadline_table.c.deadline_time,
- deadline_table.c.callback,
- deadline_table.c.callback_state,
- dag_run_table.c.dag_id,
- )
- .join(dag_run_table, deadline_table.c.dagrun_id ==
dag_run_table.c.id)
- .where(deadline_table.c.callback_id.is_(None)) # Only get
rows that haven't been migrated yet
- .limit(BATCH_SIZE)
- ).fetchall()
-
- if not batch:
- break
-
- migrate_batch(conn, deadline_table, callback_table, batch)
- print(f"Migrated {len(batch)} deadline records in batch
{batch_num}")
- # Add new columns (temporarily nullable until data has been migrated)
+def upgrade():
+ """Replace Deadline table's inline callback fields with callback_id
foreign key."""
with op.batch_alter_table("deadline") as batch_op:
batch_op.add_column(sa.Column("missed", sa.Boolean(), nullable=True))
batch_op.add_column(sa.Column("callback_id", sa.Uuid(), nullable=True))
- migrate_all_data()
+ if context.is_offline_mode():
+ print(
+ dedent("""
+ ------------
+ -- WARNING: Unable to migrate the data in the deadline table
while in offline mode!
+ -- All the rows in the deadline table will be deleted in this
mode.
+ ------------
+ """)
+ )
+ op.execute("DELETE FROM deadline")
+ else:
+ conn = op.get_bind()
+ batch_size = conf.getint("database", "migration_batch_size")
+ if conn.dialect.name == "postgresql":
+ _upgrade_postgresql(conn, batch_size)
+ else:
+ _upgrade_mysql_sqlite(conn, batch_size)
with op.batch_alter_table("deadline") as batch_op:
- # Data for `missed` and `callback_id` has been migrated so make them
non-nullable
batch_op.alter_column("missed", existing_type=sa.Boolean(),
nullable=False)
batch_op.alter_column("callback_id", existing_type=sa.Uuid(),
nullable=False)
@@ -198,51 +278,164 @@ def upgrade():
batch_op.drop_column("callback_state")
-def downgrade():
- """Restore Deadline table's inline callback fields from callback_id
foreign key."""
- from airflow.utils.state import CallbackState
+def _downgrade_postgresql(conn, batch_size):
+ """Batched keyset pagination: bounded UPDATE ... FROM callback per
batch."""
+ batch_num = 0
+ total_migrated = 0
+ last_id = "00000000-0000-0000-0000-000000000000"
+
+ while True:
+ batch_num += 1
+
+ upper_row = conn.execute(
+ sa.text("""
+ SELECT id FROM deadline
+ WHERE callback_id IS NOT NULL AND id > :last_id
+ ORDER BY id
+ OFFSET :offset LIMIT 1
+ """),
+ {"last_id": last_id, "offset": batch_size - 1},
+ ).fetchone()
+
+ if upper_row is None:
+ result = conn.execute(
+ sa.text("""
+ UPDATE deadline d
+ SET callback = json_build_object(
+ '__data__', json_build_object(
+ 'path', c.data::jsonb->'__var'->>'path',
+ 'kwargs', c.data::jsonb->'__var'->'kwargs'
+ ),
+ '__classname__', :classname,
+ '__version__', 0
+ )::json,
+ callback_state = CASE
+ WHEN c.state IN (:state_success, :state_failed)
THEN c.state
+ ELSE NULL
+ END,
+ trigger_id = NULL,
+ callback_id = NULL
+ FROM callback c
+ WHERE c.id = d.callback_id
+ AND d.callback_id IS NOT NULL
+ AND d.id > :last_id
+ """),
+ {
+ "classname": _ASYNC_CALLBACK_CLASSNAME,
+ "state_success": _CALLBACK_STATE_SUCCESS,
+ "state_failed": _CALLBACK_STATE_FAILED,
+ "last_id": last_id,
+ },
+ )
+ total_migrated += result.rowcount
+ if result.rowcount > 0:
+ print(f"Migrated {result.rowcount} deadline records in batch
{batch_num} (final)")
+ break
+
+ upper_id = str(upper_row[0])
+ result = conn.execute(
+ sa.text("""
+ UPDATE deadline d
+ SET callback = json_build_object(
+ '__data__', json_build_object(
+ 'path', c.data::jsonb->'__var'->>'path',
+ 'kwargs', c.data::jsonb->'__var'->'kwargs'
+ ),
+ '__classname__', :classname,
+ '__version__', 0
+ )::json,
+ callback_state = CASE
+ WHEN c.state IN (:state_success, :state_failed) THEN
c.state
+ ELSE NULL
+ END,
+ trigger_id = NULL,
+ callback_id = NULL
+ FROM callback c
+ WHERE c.id = d.callback_id
+ AND d.id > :last_id
+ AND d.id <= :upper_id
+ """),
+ {
+ "classname": _ASYNC_CALLBACK_CLASSNAME,
+ "state_success": _CALLBACK_STATE_SUCCESS,
+ "state_failed": _CALLBACK_STATE_FAILED,
+ "last_id": last_id,
+ "upper_id": upper_id,
+ },
+ )
+ total_migrated += result.rowcount
+ last_id = upper_id
+ print(f"Migrated {result.rowcount} deadline records in batch
{batch_num}")
+
+ print(f"Total migrated: {total_migrated} deadline records")
+
+
+def _downgrade_mysql_sqlite(conn, batch_size):
+ """Batched downgrade for MySQL/SQLite."""
+ import json
+
+ deadline_table = table(
+ "deadline",
+ column("id", sa.Uuid()),
+ column("callback_id", sa.Uuid()),
+ column("callback", sa.JSON()),
+ column("callback_state", sa.String(20)),
+ column("trigger_id", sa.Integer()),
+ )
+
+ callback_table = table(
+ "callback",
+ column("id", sa.Uuid()),
+ column("data", sa.JSON()),
+ column("state", sa.String(10)),
+ )
+
+ batch_num = 0
+ while True:
+ batch_num += 1
+ batch = conn.execute(
+ select(
+ deadline_table.c.id.label("deadline_id"),
+ deadline_table.c.callback_id,
+ callback_table.c.data.label("callback_data"),
+ callback_table.c.state.label("callback_state"),
+ )
+ .join(callback_table, deadline_table.c.callback_id ==
callback_table.c.id)
+ .where(deadline_table.c.callback.is_(None))
+ .limit(batch_size)
+ ).fetchall()
- def migrate_batch(conn, deadline_table, callback_table, batch):
- deadline_updates = []
- callback_ids_to_delete = []
+ if not batch:
+ break
+ deadline_updates = []
for row in batch:
- try:
- filtered_cb_data = {k: row.callback_data[k] for k in ("path",
"kwargs")}
-
- # Hard-coding the serialization to avoid SDK import.
- # Since only AsyncCallback was supported in the previous
versions, this is equivalent to:
- # from airflow.serialization.serde import serialize
- # from airflow.sdk.definitions.deadline import AsyncCallback
- # callback_serialized =
serialize(AsyncCallback.deserialize(filtered_data, 0))
- callback_serialized = {
- "__data__": filtered_cb_data,
- "__classname__":
"airflow.sdk.definitions.deadline.AsyncCallback",
- "__version__": 0,
- }
-
- # Mark the deadline as not handled if its callback is not in a
terminal state so that the
- # scheduler handles it appropriately
- if row.callback_state in {CallbackState.SUCCESS,
CallbackState.FAILED}:
- callback_state = row.callback_state
- else:
- callback_state = None
-
- deadline_updates.append(
- {
- "deadline_id": row.deadline_id,
- "callback": callback_serialized,
- "callback_state": callback_state,
- "trigger_id": None,
- "callback_id": None,
- }
- )
-
- callback_ids_to_delete.append(row.callback_id)
+ cb_data = (
+ row.callback_data if isinstance(row.callback_data, dict) else
json.loads(row.callback_data)
+ )
+ cb_inner = cb_data.get("__var", cb_data)
+
+ callback_serialized = {
+ "__data__": {"path": cb_inner.get("path", ""), "kwargs":
cb_inner.get("kwargs", {})},
+ "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+ "__version__": 0,
+ }
+
+ cb_state = (
+ row.callback_state
+ if row.callback_state in (_CALLBACK_STATE_SUCCESS,
_CALLBACK_STATE_FAILED)
+ else None
+ )
- except Exception:
- print(f"Failed to migrate row: {row}")
- raise
+ deadline_updates.append(
+ {
+ "deadline_id": row.deadline_id,
+ "callback": callback_serialized,
+ "callback_state": cb_state,
+ "trigger_id": None,
+ "callback_id": None,
+ }
+ )
conn.execute(
deadline_table.update()
@@ -255,86 +448,49 @@ def downgrade():
),
deadline_updates,
)
-
conn.execute(callback_table.delete().where(callback_table.c.id.in_(callback_ids_to_delete)))
-
- def migrate_all_data():
- if context.is_offline_mode():
- print(
- dedent("""
- ------------
- -- WARNING: Unable to migrate the data in the
- -- deadline and callback tables while in offline mode!
- -- All the rows in the deadline table and the referenced rows
in
- -- the callback table will be deleted in this mode.
- ------------
- """)
- )
- op.execute("DELETE FROM deadline")
- return
-
- deadline_table = table(
- "deadline",
- column("id", sa.Uuid()),
- column("callback_id", sa.Uuid()),
- column("callback", sa.JSON()),
- column("callback_state", sa.String(20)),
- column("trigger_id", sa.Integer()),
- )
-
- callback_table = table(
- "callback",
- column("id", sa.Uuid()),
- column("data", ExtendedJSON()),
- column("state", sa.String(10)),
- )
-
- conn = op.get_bind()
- batch_num = 0
-
- while True:
- batch_num += 1
- batch = conn.execute(
- select(
- deadline_table.c.id.label("deadline_id"),
- deadline_table.c.callback_id,
- callback_table.c.data.label("callback_data"),
- callback_table.c.state.label("callback_state"),
- )
- .join(callback_table, deadline_table.c.callback_id ==
callback_table.c.id)
- .where(deadline_table.c.callback.is_(None)) # Only get rows
that haven't been downgraded yet
- .limit(BATCH_SIZE)
- ).fetchall()
-
- if not batch:
- break
+ print(f"Migrated {len(batch)} deadline records in batch {batch_num}")
- migrate_batch(conn, deadline_table, callback_table, batch)
- print(f"Migrated {len(batch)} deadline records in batch
{batch_num}")
+def downgrade():
+ """Restore Deadline table's inline callback fields from callback_id
foreign key."""
with op.batch_alter_table("deadline") as batch_op:
batch_op.add_column(sa.Column("callback_state", sa.VARCHAR(length=20),
nullable=True))
batch_op.add_column(sa.Column("trigger_id", sa.INTEGER(),
autoincrement=False, nullable=True))
-
- # Temporarily nullable until data has been migrated
batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=True))
-
- # Make callback_id nullable so the associated callbacks can be cleared
during migration
batch_op.alter_column("callback_id", existing_type=sa.Uuid(),
nullable=True)
-
batch_op.drop_constraint(batch_op.f("deadline_callback_id_fkey"),
type_="foreignkey")
- # Note: deadline_callback_id_idx is kept here so it can speed up the
JOIN in migrate_all_data()
- migrate_all_data()
+ if context.is_offline_mode():
+ print(
+ dedent("""
+ ------------
+ -- WARNING: Unable to migrate the data in the
+ -- deadline and callback tables while in offline mode!
+ -- All the rows in the deadline table and the referenced rows in
+ -- the callback table will be deleted in this mode.
+ ------------
+ """)
+ )
+ op.execute("DELETE FROM deadline")
+ else:
+ conn = op.get_bind()
+ batch_size = conf.getint("database", "migration_batch_size")
+ if conn.dialect.name == "postgresql":
+ _downgrade_postgresql(conn, batch_size)
+ else:
+ _downgrade_mysql_sqlite(conn, batch_size)
+
+ op.execute(
+ "DELETE FROM callback WHERE id NOT IN (SELECT callback_id FROM
deadline WHERE callback_id IS NOT NULL)"
+ )
with op.batch_alter_table("deadline") as batch_op:
- # Data for `callback` has been migrated so make it non-nullable
batch_op.alter_column("callback", existing_type=sa.JSON(),
nullable=False)
batch_op.create_foreign_key(batch_op.f("deadline_trigger_id_fkey"),
"trigger", ["trigger_id"], ["id"])
batch_op.drop_index("deadline_missed_deadline_time_idx")
batch_op.create_index(
batch_op.f("deadline_callback_state_time_idx"), ["callback_state",
"deadline_time"], unique=False
)
- # Drop the index after data migration so it can speed up the JOIN
query in migrate_all_data()
batch_op.drop_index("deadline_callback_id_idx")
batch_op.drop_column("callback_id")
batch_op.drop_column("missed")