This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 247382fd0240c371a62748c67bc7a93700af98f0 Author: Ephraim Anierobi <[email protected]> AuthorDate: Thu Sep 9 14:24:24 2021 +0100 Limit the number of queued dagruns created by the Scheduler (#18065) There's no limit to the amount of queued dagruns to create currently and it has become a concern with issues raised against it. See #18023 and #17979 Co-authored-by: Sam Wheating <[email protected]> (cherry picked from commit 0eb41b5952c2ce1884594c82bbf05835912b9812) --- airflow/config_templates/config.yml | 8 + airflow/config_templates/default_airflow.cfg | 4 + airflow/jobs/scheduler_job.py | 21 +- ...26fe78_add_index_on_state_dag_id_for_queued_.py | 52 +++ airflow/models/dagrun.py | 10 + docs/apache-airflow/migrations-ref.rst | 4 +- tests/jobs/test_scheduler_job.py | 411 ++++++--------------- 7 files changed, 214 insertions(+), 296 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 9945213..7abcb06 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -191,6 +191,14 @@ type: string example: ~ default: "16" + - name: max_queued_runs_per_dag + description: | + The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs + if it reaches the limit. This is not configurable at the DAG level. + version_added: 2.1.4 + type: string + example: ~ + default: "16" - name: load_examples description: | Whether to load the DAG examples that ship with Airflow. It's good to diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 03d5e1f..56a1d90 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -127,6 +127,10 @@ dags_are_paused_at_creation = True # which is defaulted as ``max_active_runs_per_dag``. max_active_runs_per_dag = 16 +# The maximum number of queued dagruns for a single DAG. The scheduler will not create more DAG runs +# if it reaches the limit. This is not configurable at the DAG level. +max_queued_runs_per_dag = 16 + # Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 8d5f888..45083a4 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -985,14 +985,31 @@ class SchedulerJob(BaseJob): existing_dagruns = ( session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all() ) + max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag') + + queued_runs_of_dags = defaultdict( + int, + session.query(DagRun.dag_id, func.count('*')) + .filter( # We use `list` here because SQLA doesn't accept a set + # We use set to avoid duplicate dag_ids + DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})), + DagRun.state == State.QUEUED, + ) + .group_by(DagRun.dag_id) + .all(), + ) for dag_model in dag_models: + # Lets quickly check if we have exceeded the number of queued dagruns per dags + total_queued = queued_runs_of_dags[dag_model.dag_id] + if total_queued >= max_queued_dagruns: + continue + try: dag = self.dagbag.get_dag(dag_model.dag_id, session=session) except SerializedDagNotFound: self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id) continue - dag_hash = self.dagbag.dags_hash.get(dag.dag_id) # Explicitly check if the DagRun already exists. This is an edge case # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after` @@ -1003,6 +1020,7 @@ class SchedulerJob(BaseJob): # create a new one. This is so that in the next Scheduling loop we try to create new runs # instead of falling in a loop of Integrity Error. if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns: + dag.create_dagrun( run_type=DagRunType.SCHEDULED, execution_date=dag_model.next_dagrun, @@ -1012,6 +1030,7 @@ class SchedulerJob(BaseJob): dag_hash=dag_hash, creating_job_id=self.id, ) + queued_runs_of_dags[dag_model.dag_id] += 1 dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in diff --git a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py new file mode 100644 index 0000000..7326d73 --- /dev/null +++ b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add index on state, dag_id for queued dagrun + +Revision ID: ccde3e26fe78 +Revises: 092435bf5d12 +Create Date: 2021-09-08 16:35:34.867711 + +""" + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = 'ccde3e26fe78' +down_revision = '092435bf5d12' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply Add index on state, dag_id for queued dagrun""" + with op.batch_alter_table('dag_run') as batch_op: + batch_op.create_index( + 'idx_dag_run_queued_dags', + ["state", "dag_id"], + postgres_where=text("state='queued'"), + mssql_where=text("state='queued'"), + sqlite_where=text("state='queued'"), + ) + + +def downgrade(): + """Unapply Add index on state, dag_id for queued dagrun""" + with op.batch_alter_table('dag_run') as batch_op: + batch_op.drop_index('idx_dag_run_queued_dags') diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index c27942b..1e5c2c1 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -107,6 +107,16 @@ class DagRun(Base, LoggingMixin): mssql_where=text("state='running'"), sqlite_where=text("state='running'"), ), + # since mysql lacks filtered/partial indices, this creates a + # duplicate index on mysql. Not the end of the world + Index( + 'idx_dag_run_queued_dags', + 'state', + 'dag_id', + postgres_where=text("state='queued'"), + mssql_where=text("state='queued'"), + sqlite_where=text("state='queued'"), + ), ) task_instances = relationship( diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index e049603..d43689c 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ -| ``092435bf5d12`` (head) | ``97cdd93827b8`` | ``2.1.4`` | Add ``max_active_runs`` column to ``dag_model`` table | +| ``ccde3e26fe78`` (head) | ``092435bf5d12`` | ``2.1.4`` | Add index on state, dag_id for queued ``dagrun`` | ++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ +| ``092435bf5d12`` | ``97cdd93827b8`` | ``2.1.4`` | Add ``max_active_runs`` column to ``dag_model`` table | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ | ``97cdd93827b8`` | ``a13f7613ad25`` | ``2.1.3`` | Add ``queued_at`` column in ``dag_run`` table | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 926a1fe..2364c80 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -799,21 +799,33 @@ class TestSchedulerJob(unittest.TestCase): assert 0 == len(self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)) session.rollback() - def test_tis_for_queued_dagruns_are_not_run(self, dag_maker): + def test_tis_for_queued_dagruns_are_not_run(self): """ This tests that tis from queued dagruns are not queued """ dag_id = "test_tis_for_queued_dagruns_are_not_run" task_id_1 = 'dummy' + session = settings.Session() - with dag_maker(dag_id) as dag: + with DAG(dag_id=dag_id, start_date=DEFAULT_DATE) as dag: task1 = DummyOperator(task_id=task_id_1) - dr1 = dag_maker.create_dagrun(state=State.QUEUED) - dr2 = dag_maker.create_dagrun( - run_id='test2', execution_date=dag.following_schedule(dr1.execution_date) + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + + dagbag.bag_dag(dag=dag, root_dag=dag) + dagbag.sync_to_db(session=session) + dr1 = dag.create_dagrun(run_id='test', run_type=DagRunType.SCHEDULED, state=State.QUEUED) + dr2 = dag.create_dagrun( + run_id='test2', + execution_date=dag.following_schedule(dr1.execution_date), + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, ) self.scheduler_job = SchedulerJob(subdir=os.devnull) - session = settings.Session() + ti1 = TaskInstance(task1, dr1.execution_date) ti2 = TaskInstance(task1, dr2.execution_date) ti1.state = State.SCHEDULED @@ -1579,6 +1591,41 @@ class TestSchedulerJob(unittest.TestCase): self.scheduler_job.executor.end.assert_called_once() mock_processor_agent.return_value.end.reset_mock(side_effect=True) + def test_theres_limit_to_queued_dagruns_in_a_dag(self): + """This tests that there's limit to the number of queued dagrun scheduler can create in a dag""" + with DAG(dag_id='test_theres_limit_to_queued_dagruns_in_a_dag', start_date=DEFAULT_DATE) as dag: + DummyOperator(task_id='mytask') + + session = settings.Session() + + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + + dagbag.bag_dag(dag=dag, root_dag=dag) + dagbag.sync_to_db(session=session) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor() + self.scheduler_job.processor_agent = mock.MagicMock() + + self.scheduler_job.dagbag = dagbag + + session = settings.Session() + orm_dag = session.query(DagModel).get(dag.dag_id) + assert orm_dag is not None + for _ in range(20): + self.scheduler_job._create_dag_runs([orm_dag], session) + assert session.query(DagRun).count() == 16 + + with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}): + clear_db_runs() + for i in range(20): + self.scheduler_job._create_dag_runs([orm_dag], session) + assert session.query(DagRun).count() == 5 + def test_dagrun_timeout_verify_max_active_runs(self): """ Test if a a dagrun will not be scheduled if max_dag_runs @@ -3729,19 +3776,38 @@ class TestSchedulerJob(unittest.TestCase): # Assert that the other one is queued assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1 - def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self, dag_maker): - session = settings.Session() - with dag_maker('test_dag1', max_active_runs=1) as dag: + def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self): + + with DAG( + dag_id='test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags', + start_date=DEFAULT_DATE, + max_active_runs=1, + ) as dag: DummyOperator(task_id='mytask') + + session = settings.Session() + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + + dagbag.bag_dag(dag=dag, root_dag=dag) + dagbag.sync_to_db(session=session) date = dag.following_schedule(DEFAULT_DATE) for _ in range(30): dr = dag.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date) date = dr.execution_date + timedelta(hours=1) date = timezone.datetime(2020, 1, 1) - with dag_maker('test_dag2', start_date=date) as dag2: + with DAG( + dag_id='test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags_2', + start_date=date, + ) as dag2: DummyOperator(task_id='mytask') + dagbag.bag_dag(dag=dag2, root_dag=dag2) + dagbag.sync_to_db(session=session) for _ in range(10): dr = dag2.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date) date = dr.execution_date + timedelta(hours=1) @@ -3766,13 +3832,25 @@ class TestSchedulerJob(unittest.TestCase): ) assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11 - def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker): + def test_start_queued_dagruns_do_follow_execution_date_order(self): session = settings.Session() - with dag_maker('test_dag1', max_active_runs=1) as dag: + with DAG( + dag_id='test_start_queued_dagruns_do_follow_execution_date_order', + start_date=DEFAULT_DATE, + max_active_runs=1, + ) as dag: DummyOperator(task_id='mytask') + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + + dagbag.bag_dag(dag=dag, root_dag=dag) + dagbag.sync_to_db(session=session) date = dag.following_schedule(DEFAULT_DATE) for i in range(30): - dr = dag_maker.create_dagrun( + dr = dag.create_dagrun( run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date ) date = dr.execution_date + timedelta(hours=1) @@ -3782,7 +3860,7 @@ class TestSchedulerJob(unittest.TestCase): self.scheduler_job._start_queued_dagruns(session) session.flush() - dr = DagRun.find(run_id='dagrun_0') + dr = DagRun.find(dag_id=dag.dag_id, run_id='dagrun_0') ti = dr[0].get_task_instance(task_id='mytask', session=session) ti.state = State.SUCCESS session.merge(ti) @@ -3799,7 +3877,7 @@ class TestSchedulerJob(unittest.TestCase): assert dr[0].state == State.RUNNING - def test_no_dagruns_would_stuck_in_running(self, dag_maker): + def test_no_dagruns_would_stuck_in_running(self): # Test that running dagruns are not stuck in running. # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1) # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag @@ -3810,39 +3888,54 @@ class TestSchedulerJob(unittest.TestCase): session = settings.Session() # first dag and dagruns date = timezone.datetime(2016, 1, 1) - with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1, start_date=date) as dag: + + with DAG( + dag_id='test_dagrun_states_are_correct_1', + start_date=date, + max_active_runs=1, + ) as dag: task1 = DummyOperator(task_id='dummy_task') - dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1', execution_date=date) - dag_maker.create_dagrun( + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + + dagbag.bag_dag(dag=dag, root_dag=dag) + dagbag.sync_to_db(session=session) + + dr1_running = dag.create_dagrun(run_id='dr1_run_1', execution_date=date, state=State.RUNNING) + dag.create_dagrun( run_id='dr1_run_2', state=State.QUEUED, execution_date=dag.following_schedule(dr1_running.execution_date), ) # second dag and dagruns date = timezone.datetime(2020, 1, 1) - with dag_maker('test_dagrun_states_are_correct_2', start_date=date) as dag: + + with DAG(dag_id='test_dagrun_states_are_correct_2', start_date=date) as dag2: DummyOperator(task_id='dummy_task') for i in range(16): - dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date) + dr = dag2.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date) date = dr.execution_date + timedelta(hours=1) dr16 = DagRun.find(run_id='dr2_run_16') date = dr16[0].execution_date + timedelta(hours=1) for i in range(16, 32): - dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date) + dr = dag2.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date) date = dr.execution_date + timedelta(hours=1) # third dag and dagruns date = timezone.datetime(2021, 1, 1) - with dag_maker('test_dagrun_states_are_correct_3', start_date=date) as dag: + with DAG(dag_id='test_dagrun_states_are_correct_3', start_date=date) as dag3: DummyOperator(task_id='dummy_task') for i in range(16): - dr = dag_maker.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date) + dr = dag3.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date) date = dr.execution_date + timedelta(hours=1) dr16 = DagRun.find(run_id='dr3_run_16') date = dr16[0].execution_date + timedelta(hours=1) for i in range(16, 32): - dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date) + dr = dag3.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date) date = dr.execution_date + timedelta(hours=1) self.scheduler_job = SchedulerJob(subdir=os.devnull) @@ -3862,276 +3955,6 @@ class TestSchedulerJob(unittest.TestCase): assert DagRun.find(run_id='dr1_run_1')[0].state == State.SUCCESS assert DagRun.find(run_id='dr1_run_2')[0].state == State.RUNNING - @pytest.mark.parametrize( - "state, start_date, end_date", - [ - [State.NONE, None, None], - [ - State.UP_FOR_RETRY, - timezone.utcnow() - datetime.timedelta(minutes=30), - timezone.utcnow() - datetime.timedelta(minutes=15), - ], - [ - State.UP_FOR_RESCHEDULE, - timezone.utcnow() - datetime.timedelta(minutes=30), - timezone.utcnow() - datetime.timedelta(minutes=15), - ], - ], - ) - def test_dag_file_processor_process_task_instances(self, state, start_date, end_date, dag_maker): - """ - Test if _process_task_instances puts the right task instances into the - mock_list. - """ - with dag_maker(dag_id='test_scheduler_process_execute_task'): - BashOperator(task_id='dummy', bash_command='echo hi') - - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.processor_agent = mock.MagicMock() - - dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - assert dr is not None - - with create_session() as session: - ti = dr.get_task_instances(session=session)[0] - ti.state = state - ti.start_date = start_date - ti.end_date = end_date - - self.scheduler_job._schedule_dag_run(dr, session) - assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1 - - session.refresh(ti) - assert ti.state == State.SCHEDULED - - @pytest.mark.parametrize( - "state,start_date,end_date", - [ - [State.NONE, None, None], - [ - State.UP_FOR_RETRY, - timezone.utcnow() - datetime.timedelta(minutes=30), - timezone.utcnow() - datetime.timedelta(minutes=15), - ], - [ - State.UP_FOR_RESCHEDULE, - timezone.utcnow() - datetime.timedelta(minutes=30), - timezone.utcnow() - datetime.timedelta(minutes=15), - ], - ], - ) - def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag( - self, state, start_date, end_date, dag_maker - ): - """ - Test if _process_task_instances puts the right task instances into the - mock_list. - """ - with dag_maker(dag_id='test_scheduler_process_execute_task_with_max_active_tis_per_dag'): - BashOperator(task_id='dummy', max_active_tis_per_dag=2, bash_command='echo Hi') - - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.processor_agent = mock.MagicMock() - - dr = dag_maker.create_dagrun( - run_type=DagRunType.SCHEDULED, - ) - assert dr is not None - - with create_session() as session: - ti = dr.get_task_instances(session=session)[0] - ti.state = state - ti.start_date = start_date - ti.end_date = end_date - - self.scheduler_job._schedule_dag_run(dr, session) - assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1 - - session.refresh(ti) - assert ti.state == State.SCHEDULED - - @pytest.mark.parametrize( - "state, start_date, end_date", - [ - [State.NONE, None, None], - [ - State.UP_FOR_RETRY, - timezone.utcnow() - datetime.timedelta(minutes=30), - timezone.utcnow() - datetime.timedelta(minutes=15), - ], - [ - State.UP_FOR_RESCHEDULE, - timezone.utcnow() - datetime.timedelta(minutes=30), - timezone.utcnow() - datetime.timedelta(minutes=15), - ], - ], - ) - def test_dag_file_processor_process_task_instances_depends_on_past( - self, state, start_date, end_date, dag_maker - ): - """ - Test if _process_task_instances puts the right task instances into the - mock_list. - """ - with dag_maker( - dag_id='test_scheduler_process_execute_task_depends_on_past', - default_args={ - 'depends_on_past': True, - }, - ): - BashOperator(task_id='dummy1', bash_command='echo hi') - BashOperator(task_id='dummy2', bash_command='echo hi') - - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.processor_agent = mock.MagicMock() - dr = dag_maker.create_dagrun( - run_type=DagRunType.SCHEDULED, - ) - assert dr is not None - - with create_session() as session: - tis = dr.get_task_instances(session=session) - for ti in tis: - ti.state = state - ti.start_date = start_date - ti.end_date = end_date - - self.scheduler_job._schedule_dag_run(dr, session) - assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2 - - session.refresh(tis[0]) - session.refresh(tis[1]) - assert tis[0].state == State.SCHEDULED - assert tis[1].state == State.SCHEDULED - - def test_scheduler_job_add_new_task(self, dag_maker): - """ - Test if a task instance will be added if the dag is updated - """ - with dag_maker(dag_id='test_scheduler_add_new_task') as dag: - BashOperator(task_id='dummy', bash_command='echo test') - - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.dagbag = dag_maker.dagbag - - session = settings.Session() - orm_dag = dag_maker.dag_model - assert orm_dag is not None - - if self.scheduler_job.processor_agent: - self.scheduler_job.processor_agent.end() - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.processor_agent = mock.MagicMock() - dag = self.scheduler_job.dagbag.get_dag('test_scheduler_add_new_task', session=session) - self.scheduler_job._create_dag_runs([orm_dag], session) - - drs = DagRun.find(dag_id=dag.dag_id, session=session) - assert len(drs) == 1 - dr = drs[0] - - tis = dr.get_task_instances() - assert len(tis) == 1 - - BashOperator(task_id='dummy2', dag=dag, bash_command='echo test') - SerializedDagModel.write_dag(dag=dag) - - self.scheduler_job._schedule_dag_run(dr, session) - assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2 - session.flush() - - drs = DagRun.find(dag_id=dag.dag_id, session=session) - assert len(drs) == 1 - dr = drs[0] - - tis = dr.get_task_instances() - assert len(tis) == 2 - - def test_runs_respected_after_clear(self, dag_maker): - """ - Test dag after dag.clear, max_active_runs is respected - """ - with dag_maker( - dag_id='test_scheduler_max_active_runs_respected_after_clear', max_active_runs=1 - ) as dag: - BashOperator(task_id='dummy', bash_command='echo Hi') - - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.processor_agent = mock.MagicMock() - - session = settings.Session() - date = DEFAULT_DATE - dag_maker.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.QUEUED, - ) - date = dag.following_schedule(date) - dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=date, - state=State.QUEUED, - ) - date = dag.following_schedule(date) - dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=date, - state=State.QUEUED, - ) - dag.clear() - - assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 3 - - session = settings.Session() - self.scheduler_job._start_queued_dagruns(session) - session.flush() - # Assert that only 1 dagrun is active - assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1 - # Assert that the other two are queued - assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 2 - - def test_timeout_triggers(self, dag_maker): - """ - Tests that tasks in the deferred state, but whose trigger timeout - has expired, are correctly failed. - - """ - - session = settings.Session() - # Create the test DAG and task - with dag_maker( - dag_id='test_timeout_triggers', - start_date=DEFAULT_DATE, - schedule_interval='@once', - max_active_runs=1, - session=session, - ): - DummyOperator(task_id='dummy1') - - # Create a Task Instance for the task that is allegedly deferred - # but past its timeout, and one that is still good. - # We don't actually need a linked trigger here; the code doesn't check. - dr1 = dag_maker.create_dagrun() - dr2 = dag_maker.create_dagrun( - run_id="test2", execution_date=DEFAULT_DATE + datetime.timedelta(seconds=1) - ) - ti1 = dr1.get_task_instance('dummy1', session) - ti2 = dr2.get_task_instance('dummy1', session) - ti1.state = State.DEFERRED - ti1.trigger_timeout = timezone.utcnow() - datetime.timedelta(seconds=60) - ti2.state = State.DEFERRED - ti2.trigger_timeout = timezone.utcnow() + datetime.timedelta(seconds=60) - session.flush() - - # Boot up the scheduler and make it check timeouts - self.scheduler_job = SchedulerJob(subdir=os.devnull) - self.scheduler_job.check_trigger_timeouts(session=session) - - # Make sure that TI1 is now scheduled to fail, and 2 wasn't touched - session.refresh(ti1) - session.refresh(ti2) - assert ti1.state == State.SCHEDULED - assert ti1.next_method == "__fail__" - assert ti2.state == State.DEFERRED - @pytest.mark.xfail(reason="Work out where this goes") def test_task_with_upstream_skip_process_task_instances():
