This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fd191bb32a0d2c5ce190c7af64e78344d9857216
Author: Jed Cunningham <[email protected]>
AuthorDate: Sun Oct 24 08:45:12 2021 -0600

    Faster PostgreSQL db migration to Airflow 2.2 (#19166)
    
    Bigger Airflow databases can take a long time to migrate the database,
    particularly if they have a lot of task instances. On PostgreSQL, creating a
    new table is much faster than updating the existing table.
    
    (cherry picked from commit 559d6074d7012d5472a96fdeadc0e10f958d6134)
---
 UPDATING.md                                        |  2 +
 .../7b2661a43ba3_taskinstance_keyed_to_dagrun.py   | 91 ++++++++++++++++++----
 tests/executors/test_base_executor.py              |  2 +-
 tests/models/test_cleartasks.py                    | 18 ++---
 4 files changed, 86 insertions(+), 27 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 443afe9..867a575 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -98,6 +98,8 @@ the scheduler can create in a dag. Now, the maximum number is 
controlled interna
 
 ## Airflow 2.2.0
 
+Note: Upgrading the database to `2.2.0` or later can take some time to 
complete, particularly if you have a large `task_instance` table.
+
 ### `worker_log_server_port` configuration has been moved to the ``logging`` 
section.
 
 The `worker_log_server_port` configuration option has been moved from 
`[celery]` section to `[logging]` section to allow for re-use between different 
executors.
diff --git 
a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py 
b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
index 059144e..4676f4a 100644
--- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
+++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py
@@ -182,10 +182,6 @@ def upgrade():
     op.add_column('task_instance', sa.Column('run_id', 
type_=string_id_col_type, nullable=True))
     op.add_column('task_reschedule', sa.Column('run_id', 
type_=string_id_col_type, nullable=True))
 
-    # Then update the new column by selecting the right value from DagRun
-    update_query = _multi_table_update(dialect_name, task_instance, 
task_instance.c.run_id)
-    op.execute(update_query)
-
     #
     # TaskReschedule has a FK to TaskInstance, so we have to update that before
     # we can drop the TI.execution_date column
@@ -204,29 +200,81 @@ def upgrade():
             batch_op.drop_index('task_reschedule_dag_task_date_fkey')
         batch_op.drop_index('idx_task_reschedule_dag_task_date')
 
+    # Then update the new column by selecting the right value from DagRun
+    # But first we will drop and recreate indexes to make it faster
+    if dialect_name == 'postgresql':
+        # Recreate task_instance, without execution_date and with dagrun.run_id
+        op.execute(
+            """
+            CREATE TABLE new_task_instance AS SELECT
+                ti.task_id,
+                ti.dag_id,
+                dag_run.run_id,
+                ti.start_date,
+                ti.end_date,
+                ti.duration,
+                ti.state,
+                ti.try_number,
+                ti.hostname,
+                ti.unixname,
+                ti.job_id,
+                ti.pool,
+                ti.queue,
+                ti.priority_weight,
+                ti.operator,
+                ti.queued_dttm,
+                ti.pid,
+                ti.max_tries,
+                ti.executor_config,
+                ti.pool_slots,
+                ti.queued_by_job_id,
+                ti.external_executor_id,
+                ti.trigger_id,
+                ti.trigger_timeout,
+                ti.next_method,
+                ti.next_kwargs
+            FROM task_instance ti
+            INNER JOIN dag_run ON dag_run.dag_id = ti.dag_id AND 
dag_run.execution_date = ti.execution_date;
+        """
+        )
+        op.drop_table('task_instance')
+        op.rename_table('new_task_instance', 'task_instance')
+
+        # Fix up columns after the 'create table as select'
+        with op.batch_alter_table('task_instance', schema=None) as batch_op:
+            batch_op.alter_column(
+                'pool', existing_type=string_id_col_type, 
existing_nullable=True, nullable=False
+            )
+            batch_op.alter_column('max_tries', existing_type=sa.Integer(), 
server_default="-1")
+            batch_op.alter_column(
+                'pool_slots', existing_type=sa.Integer(), 
existing_nullable=True, nullable=False
+            )
+    else:
+        update_query = _multi_table_update(dialect_name, task_instance, 
task_instance.c.run_id)
+        op.execute(update_query)
+
     with op.batch_alter_table('task_instance', schema=None) as batch_op:
+        if dialect_name != 'postgresql':
+            # TODO: Is this right for non-postgres?
+            if dialect_name == 'mssql':
+                constraints = get_table_constraints(conn, "task_instance")
+                pk, _ = constraints['PRIMARY KEY'].popitem()
+                batch_op.drop_constraint(pk, type_='primary')
+            elif dialect_name not in ('sqlite'):
+                batch_op.drop_constraint('task_instance_pkey', type_='primary')
+            batch_op.drop_index('ti_dag_date')
+            batch_op.drop_index('ti_state_lkp')
+            batch_op.drop_column('execution_date')
+
         # Then make it non-nullable
         batch_op.alter_column(
             'run_id', existing_type=string_id_col_type, 
existing_nullable=True, nullable=False
         )
-
         batch_op.alter_column(
             'dag_id', existing_type=string_id_col_type, 
existing_nullable=True, nullable=False
         )
-        batch_op.alter_column('execution_date', existing_type=dt_type, 
existing_nullable=True, nullable=False)
 
-        # TODO: Is this right for non-postgres?
-        if dialect_name == 'mssql':
-            constraints = get_table_constraints(conn, "task_instance")
-            pk, _ = constraints['PRIMARY KEY'].popitem()
-            batch_op.drop_constraint(pk, type_='primary')
-        elif dialect_name not in ('sqlite'):
-            batch_op.drop_constraint('task_instance_pkey', type_='primary')
         batch_op.create_primary_key('task_instance_pkey', ['dag_id', 
'task_id', 'run_id'])
-
-        batch_op.drop_index('ti_dag_date')
-        batch_op.drop_index('ti_state_lkp')
-        batch_op.drop_column('execution_date')
         batch_op.create_foreign_key(
             'task_instance_dag_run_fkey',
             'dag_run',
@@ -237,6 +285,15 @@ def upgrade():
 
         batch_op.create_index('ti_dag_run', ['dag_id', 'run_id'])
         batch_op.create_index('ti_state_lkp', ['dag_id', 'task_id', 'run_id', 
'state'])
+        if dialect_name == 'postgresql':
+            batch_op.create_index('ti_dag_state', ['dag_id', 'state'])
+            batch_op.create_index('ti_job_id', ['job_id'])
+            batch_op.create_index('ti_pool', ['pool', 'state', 
'priority_weight'])
+            batch_op.create_index('ti_state', ['state'])
+            batch_op.create_foreign_key(
+                'task_instance_trigger_id_fkey', 'trigger', ['trigger_id'], 
['id'], ondelete="CASCADE"
+            )
+            batch_op.create_index('ti_trigger_id', ['trigger_id'])
 
     with op.batch_alter_table('task_reschedule', schema=None) as batch_op:
         batch_op.drop_column('execution_date')
diff --git a/tests/executors/test_base_executor.py 
b/tests/executors/test_base_executor.py
index 561d551..49d6c01 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -69,5 +69,5 @@ def test_try_adopt_task_instances(dag_maker):
     dagrun = dag_maker.create_dagrun(execution_date=date)
     tis = dagrun.task_instances
 
-    assert [ti.task_id for ti in tis] == ["task_1", "task_2", "task_3"]
+    assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
     assert BaseExecutor().try_adopt_task_instances(tis) == tis
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index b5ac22a..0f431ab 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -53,7 +53,7 @@ class TestClearTasks:
             state=State.RUNNING,
             run_type=DagRunType.SCHEDULED,
         )
-        ti0, ti1 = dr.task_instances
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
@@ -117,7 +117,7 @@ class TestClearTasks:
             state=State.RUNNING,
             run_type=DagRunType.SCHEDULED,
         )
-        ti0, ti1 = dr.task_instances
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         dr.last_scheduling_decision = DEFAULT_DATE
         ti0.state = TaskInstanceState.SUCCESS
         ti1.state = TaskInstanceState.SUCCESS
@@ -148,7 +148,7 @@ class TestClearTasks:
             run_type=DagRunType.SCHEDULED,
         )
 
-        ti0, ti1 = dr.task_instances
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
@@ -187,7 +187,7 @@ class TestClearTasks:
             run_type=DagRunType.SCHEDULED,
         )
 
-        ti0, ti1 = dr.task_instances
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
@@ -223,7 +223,7 @@ class TestClearTasks:
             run_type=DagRunType.SCHEDULED,
         )
 
-        ti0, ti1 = dr.task_instances
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
         ti0.run()
@@ -263,7 +263,7 @@ class TestClearTasks:
         )
         session = dag_maker.session
 
-        ti0, ti1 = dr.task_instances
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti0.refresh_from_task(task0)
         ti1.refresh_from_task(task1)
 
@@ -380,8 +380,8 @@ class TestClearTasks:
             start_date=DEFAULT_DATE,
             end_date=DEFAULT_DATE + datetime.timedelta(days=10),
         ):
-            op1 = DummyOperator(task_id='bash_op')
-            op2 = DummyOperator(task_id='dummy_op', retries=1)
+            op1 = DummyOperator(task_id='test1')
+            op2 = DummyOperator(task_id='test2', retries=1)
             op1 >> op2
 
         dr = dag_maker.create_dagrun(
@@ -389,7 +389,7 @@ class TestClearTasks:
             run_type=DagRunType.SCHEDULED,
         )
 
-        ti1, ti2 = dr.task_instances
+        ti1, ti2 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
         ti1.task = op1
         ti2.task = op2
 

Reply via email to