This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit fd191bb32a0d2c5ce190c7af64e78344d9857216 Author: Jed Cunningham <[email protected]> AuthorDate: Sun Oct 24 08:45:12 2021 -0600 Faster PostgreSQL db migration to Airflow 2.2 (#19166) Bigger Airflow databases can take a long time to migrate the database, particularly if they have a lot of task instances. On PostgreSQL, creating a new table is much faster than updating the existing table. (cherry picked from commit 559d6074d7012d5472a96fdeadc0e10f958d6134) --- UPDATING.md | 2 + .../7b2661a43ba3_taskinstance_keyed_to_dagrun.py | 91 ++++++++++++++++++---- tests/executors/test_base_executor.py | 2 +- tests/models/test_cleartasks.py | 18 ++--- 4 files changed, 86 insertions(+), 27 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 443afe9..867a575 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -98,6 +98,8 @@ the scheduler can create in a dag. Now, the maximum number is controlled interna ## Airflow 2.2.0 +Note: Upgrading the database to `2.2.0` or later can take some time to complete, particularly if you have a large `task_instance` table. + ### `worker_log_server_port` configuration has been moved to the ``logging`` section. The `worker_log_server_port` configuration option has been moved from `[celery]` section to `[logging]` section to allow for re-use between different executors. diff --git a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py index 059144e..4676f4a 100644 --- a/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py +++ b/airflow/migrations/versions/7b2661a43ba3_taskinstance_keyed_to_dagrun.py @@ -182,10 +182,6 @@ def upgrade(): op.add_column('task_instance', sa.Column('run_id', type_=string_id_col_type, nullable=True)) op.add_column('task_reschedule', sa.Column('run_id', type_=string_id_col_type, nullable=True)) - # Then update the new column by selecting the right value from DagRun - update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id) - op.execute(update_query) - # # TaskReschedule has a FK to TaskInstance, so we have to update that before # we can drop the TI.execution_date column @@ -204,29 +200,81 @@ def upgrade(): batch_op.drop_index('task_reschedule_dag_task_date_fkey') batch_op.drop_index('idx_task_reschedule_dag_task_date') + # Then update the new column by selecting the right value from DagRun + # But first we will drop and recreate indexes to make it faster + if dialect_name == 'postgresql': + # Recreate task_instance, without execution_date and with dagrun.run_id + op.execute( + """ + CREATE TABLE new_task_instance AS SELECT + ti.task_id, + ti.dag_id, + dag_run.run_id, + ti.start_date, + ti.end_date, + ti.duration, + ti.state, + ti.try_number, + ti.hostname, + ti.unixname, + ti.job_id, + ti.pool, + ti.queue, + ti.priority_weight, + ti.operator, + ti.queued_dttm, + ti.pid, + ti.max_tries, + ti.executor_config, + ti.pool_slots, + ti.queued_by_job_id, + ti.external_executor_id, + ti.trigger_id, + ti.trigger_timeout, + ti.next_method, + ti.next_kwargs + FROM task_instance ti + INNER JOIN dag_run ON dag_run.dag_id = ti.dag_id AND dag_run.execution_date = ti.execution_date; + """ + ) + op.drop_table('task_instance') + op.rename_table('new_task_instance', 'task_instance') + + # Fix up columns after the 'create table as select' + with op.batch_alter_table('task_instance', schema=None) as batch_op: + batch_op.alter_column( + 'pool', existing_type=string_id_col_type, existing_nullable=True, nullable=False + ) + batch_op.alter_column('max_tries', existing_type=sa.Integer(), server_default="-1") + batch_op.alter_column( + 'pool_slots', existing_type=sa.Integer(), existing_nullable=True, nullable=False + ) + else: + update_query = _multi_table_update(dialect_name, task_instance, task_instance.c.run_id) + op.execute(update_query) + with op.batch_alter_table('task_instance', schema=None) as batch_op: + if dialect_name != 'postgresql': + # TODO: Is this right for non-postgres? + if dialect_name == 'mssql': + constraints = get_table_constraints(conn, "task_instance") + pk, _ = constraints['PRIMARY KEY'].popitem() + batch_op.drop_constraint(pk, type_='primary') + elif dialect_name not in ('sqlite'): + batch_op.drop_constraint('task_instance_pkey', type_='primary') + batch_op.drop_index('ti_dag_date') + batch_op.drop_index('ti_state_lkp') + batch_op.drop_column('execution_date') + # Then make it non-nullable batch_op.alter_column( 'run_id', existing_type=string_id_col_type, existing_nullable=True, nullable=False ) - batch_op.alter_column( 'dag_id', existing_type=string_id_col_type, existing_nullable=True, nullable=False ) - batch_op.alter_column('execution_date', existing_type=dt_type, existing_nullable=True, nullable=False) - # TODO: Is this right for non-postgres? - if dialect_name == 'mssql': - constraints = get_table_constraints(conn, "task_instance") - pk, _ = constraints['PRIMARY KEY'].popitem() - batch_op.drop_constraint(pk, type_='primary') - elif dialect_name not in ('sqlite'): - batch_op.drop_constraint('task_instance_pkey', type_='primary') batch_op.create_primary_key('task_instance_pkey', ['dag_id', 'task_id', 'run_id']) - - batch_op.drop_index('ti_dag_date') - batch_op.drop_index('ti_state_lkp') - batch_op.drop_column('execution_date') batch_op.create_foreign_key( 'task_instance_dag_run_fkey', 'dag_run', @@ -237,6 +285,15 @@ def upgrade(): batch_op.create_index('ti_dag_run', ['dag_id', 'run_id']) batch_op.create_index('ti_state_lkp', ['dag_id', 'task_id', 'run_id', 'state']) + if dialect_name == 'postgresql': + batch_op.create_index('ti_dag_state', ['dag_id', 'state']) + batch_op.create_index('ti_job_id', ['job_id']) + batch_op.create_index('ti_pool', ['pool', 'state', 'priority_weight']) + batch_op.create_index('ti_state', ['state']) + batch_op.create_foreign_key( + 'task_instance_trigger_id_fkey', 'trigger', ['trigger_id'], ['id'], ondelete="CASCADE" + ) + batch_op.create_index('ti_trigger_id', ['trigger_id']) with op.batch_alter_table('task_reschedule', schema=None) as batch_op: batch_op.drop_column('execution_date') diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index 561d551..49d6c01 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -69,5 +69,5 @@ def test_try_adopt_task_instances(dag_maker): dagrun = dag_maker.create_dagrun(execution_date=date) tis = dagrun.task_instances - assert [ti.task_id for ti in tis] == ["task_1", "task_2", "task_3"] + assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"} assert BaseExecutor().try_adopt_task_instances(tis) == tis diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index b5ac22a..0f431ab 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -53,7 +53,7 @@ class TestClearTasks: state=State.RUNNING, run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -117,7 +117,7 @@ class TestClearTasks: state=State.RUNNING, run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) dr.last_scheduling_decision = DEFAULT_DATE ti0.state = TaskInstanceState.SUCCESS ti1.state = TaskInstanceState.SUCCESS @@ -148,7 +148,7 @@ class TestClearTasks: run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -187,7 +187,7 @@ class TestClearTasks: run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -223,7 +223,7 @@ class TestClearTasks: run_type=DagRunType.SCHEDULED, ) - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) ti0.run() @@ -263,7 +263,7 @@ class TestClearTasks: ) session = dag_maker.session - ti0, ti1 = dr.task_instances + ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) @@ -380,8 +380,8 @@ class TestClearTasks: start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + datetime.timedelta(days=10), ): - op1 = DummyOperator(task_id='bash_op') - op2 = DummyOperator(task_id='dummy_op', retries=1) + op1 = DummyOperator(task_id='test1') + op2 = DummyOperator(task_id='test2', retries=1) op1 >> op2 dr = dag_maker.create_dagrun( @@ -389,7 +389,7 @@ class TestClearTasks: run_type=DagRunType.SCHEDULED, ) - ti1, ti2 = dr.task_instances + ti1, ti2 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti1.task = op1 ti2.task = op2
