ephraimbuddy commented on code in PR #66016:
URL: https://github.com/apache/airflow/pull/66016#discussion_r3259092604


##########
airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py:
##########
@@ -38,17 +43,199 @@
 airflow_version = "3.1.0"
 
 
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
+# Maximum length of the callback VARCHAR column in the pre-0080 schema.
+_CALLBACK_MAX_LEN = 500
+
+
 def upgrade():
     """Replace deadline table's string callback and JSON callback_kwargs with 
JSON callback."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.drop_column("callback_kwargs")
+            batch_op.add_column(sa.Column("callback", sa.JSON(), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the destination column alongside the existing ones so we can migrate
+    # in batches without loading the whole table into memory at once.
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("callback_new", sa.JSON(), 
nullable=True))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback"),
+        sa.column("callback_kwargs", sa.JSON()),
+        sa.column("callback_new", sa.JSON()),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_new", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(
+                deadline_read.c.id,
+                deadline_read.c.callback,
+                deadline_read.c.callback_kwargs,
+            )
+            .where(deadline_read.c.callback_new.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break

Review Comment:
   ```suggestion
   ```
   not necessary since we are using `for` statement to check rows



##########
airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py:
##########
@@ -38,17 +43,199 @@
 airflow_version = "3.1.0"
 
 
+_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback"
+# Maximum length of the callback VARCHAR column in the pre-0080 schema.
+_CALLBACK_MAX_LEN = 500
+
+
 def upgrade():
     """Replace deadline table's string callback and JSON callback_kwargs with 
JSON callback."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.drop_column("callback_kwargs")
+            batch_op.add_column(sa.Column("callback", sa.JSON(), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the destination column alongside the existing ones so we can migrate
+    # in batches without loading the whole table into memory at once.
+    with op.batch_alter_table("deadline", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("callback_new", sa.JSON(), 
nullable=True))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback"),
+        sa.column("callback_kwargs", sa.JSON()),
+        sa.column("callback_new", sa.JSON()),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_new", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(
+                deadline_read.c.id,
+                deadline_read.c.callback,
+                deadline_read.c.callback_kwargs,
+            )
+            .where(deadline_read.c.callback_new.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break
+
+        batch = []
+        for row in rows:
+            path = row[1] or ""
+            kwargs = row[2]
+            if isinstance(kwargs, str):
+                kwargs = json.loads(kwargs) if kwargs else {}
+            if not isinstance(kwargs, dict):
+                kwargs = {}
+            batch.append(
+                {
+                    "row_id": row[0],
+                    "new_callback": {
+                        "__data__": {"path": path, "kwargs": kwargs},
+                        "__classname__": _ASYNC_CALLBACK_CLASSNAME,
+                        "__version__": 0,
+                    },
+                }
+            )
+
+        conn.execute(
+            sa.update(deadline_write)
+            .where(deadline_write.c.id == sa.bindparam("row_id"))
+            .values(callback_new=sa.bindparam("new_callback")),
+            batch,
+        )
+
+        if len(rows) < batch_size:
+            break
+
     with op.batch_alter_table("deadline", schema=None) as batch_op:
         batch_op.drop_column("callback")
         batch_op.drop_column("callback_kwargs")
-        batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False))
+        batch_op.alter_column(
+            "callback_new",
+            new_column_name="callback",
+            existing_type=sa.JSON(),
+            nullable=False,
+        )
 
 
 def downgrade():
     """Replace deadline table's JSON callback with string callback and JSON 
callback_kwargs."""
+    if context.is_offline_mode():
+        print(
+            dedent("""
+            ------------
+            --  WARNING: Unable to migrate the data in the deadline table
+            --  while in offline mode!  All rows in the deadline table will
+            --  be deleted.
+            ------------
+            """)
+        )
+        op.execute("DELETE FROM deadline")
+        with op.batch_alter_table("deadline", schema=None) as batch_op:
+            batch_op.drop_column("callback")
+            batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), 
nullable=True))
+            batch_op.add_column(sa.Column("callback", sa.String(length=500), 
nullable=False))
+        return
+
+    conn = op.get_bind()
+    batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
+
+    # Add the restored columns alongside the existing JSON callback so we can
+    # back-fill in batches before dropping the JSON column.
     with op.batch_alter_table("deadline", schema=None) as batch_op:
-        batch_op.drop_column("callback")
+        batch_op.add_column(sa.Column("callback_old", sa.String(length=500), 
nullable=True))
         batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), 
nullable=True))
-        batch_op.add_column(sa.Column("callback", sa.String(length=500), 
nullable=False))
+
+    deadline_read = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback", sa.JSON()),
+        sa.column("callback_old", sa.String(500)),
+    )
+    deadline_write = sa.table(
+        "deadline",
+        sa.column("id"),
+        sa.column("callback_old", sa.String(500)),
+        sa.column("callback_kwargs", sa.JSON()),
+    )
+
+    while True:
+        rows = conn.execute(
+            sa.select(deadline_read.c.id, deadline_read.c.callback)
+            .where(deadline_read.c.callback_old.is_(None))
+            .limit(batch_size)
+        ).fetchall()
+
+        if not rows:
+            break

Review Comment:
   ```suggestion
   ```
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to