This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 247382fd0240c371a62748c67bc7a93700af98f0
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Sep 9 14:24:24 2021 +0100

    Limit the number of queued dagruns created by the Scheduler (#18065)
    
    There's no limit to the amount of queued dagruns to create currently
    and it has become a concern with issues raised against it. See #18023 and 
#17979
    
    Co-authored-by: Sam Wheating <[email protected]>
    (cherry picked from commit 0eb41b5952c2ce1884594c82bbf05835912b9812)
---
 airflow/config_templates/config.yml                |   8 +
 airflow/config_templates/default_airflow.cfg       |   4 +
 airflow/jobs/scheduler_job.py                      |  21 +-
 ...26fe78_add_index_on_state_dag_id_for_queued_.py |  52 +++
 airflow/models/dagrun.py                           |  10 +
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 tests/jobs/test_scheduler_job.py                   | 411 ++++++---------------
 7 files changed, 214 insertions(+), 296 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 9945213..7abcb06 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -191,6 +191,14 @@
       type: string
       example: ~
       default: "16"
+    - name: max_queued_runs_per_dag
+      description: |
+        The maximum number of queued dagruns for a single DAG. The scheduler 
will not create more DAG runs
+        if it reaches the limit. This is not configurable at the DAG level.
+      version_added: 2.1.4
+      type: string
+      example: ~
+      default: "16"
     - name: load_examples
       description: |
         Whether to load the DAG examples that ship with Airflow. It's good to
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 03d5e1f..56a1d90 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -127,6 +127,10 @@ dags_are_paused_at_creation = True
 # which is defaulted as ``max_active_runs_per_dag``.
 max_active_runs_per_dag = 16
 
+# The maximum number of queued dagruns for a single DAG. The scheduler will 
not create more DAG runs
+# if it reaches the limit. This is not configurable at the DAG level.
+max_queued_runs_per_dag = 16
+
 # Whether to load the DAG examples that ship with Airflow. It's good to
 # get started, but you probably want to set this to ``False`` in a production
 # environment
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8d5f888..45083a4 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -985,14 +985,31 @@ class SchedulerJob(BaseJob):
         existing_dagruns = (
             session.query(DagRun.dag_id, 
DagRun.execution_date).filter(existing_dagruns_filter).all()
         )
+        max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag')
+
+        queued_runs_of_dags = defaultdict(
+            int,
+            session.query(DagRun.dag_id, func.count('*'))
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                # We use set to avoid duplicate dag_ids
+                DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})),
+                DagRun.state == State.QUEUED,
+            )
+            .group_by(DagRun.dag_id)
+            .all(),
+        )
 
         for dag_model in dag_models:
+            # Lets quickly check if we have exceeded the number of queued 
dagruns per dags
+            total_queued = queued_runs_of_dags[dag_model.dag_id]
+            if total_queued >= max_queued_dagruns:
+                continue
+
             try:
                 dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
             except SerializedDagNotFound:
                 self.log.exception("DAG '%s' not found in serialized_dag 
table", dag_model.dag_id)
                 continue
-
             dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
             # Explicitly check if the DagRun already exists. This is an edge 
case
             # where a Dag Run is created but `DagModel.next_dagrun` and 
`DagModel.next_dagrun_create_after`
@@ -1003,6 +1020,7 @@ class SchedulerJob(BaseJob):
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+
                 dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
@@ -1012,6 +1030,7 @@ class SchedulerJob(BaseJob):
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
+                queued_runs_of_dags[dag_model.dag_id] += 1
             dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
diff --git 
a/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
 
b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
new file mode 100644
index 0000000..7326d73
--- /dev/null
+++ 
b/airflow/migrations/versions/ccde3e26fe78_add_index_on_state_dag_id_for_queued_.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add index on state, dag_id for queued dagrun
+
+Revision ID: ccde3e26fe78
+Revises: 092435bf5d12
+Create Date: 2021-09-08 16:35:34.867711
+
+"""
+
+from alembic import op
+from sqlalchemy import text
+
+# revision identifiers, used by Alembic.
+revision = 'ccde3e26fe78'
+down_revision = '092435bf5d12'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Add index on state, dag_id for queued dagrun"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.create_index(
+            'idx_dag_run_queued_dags',
+            ["state", "dag_id"],
+            postgres_where=text("state='queued'"),
+            mssql_where=text("state='queued'"),
+            sqlite_where=text("state='queued'"),
+        )
+
+
+def downgrade():
+    """Unapply Add index on state, dag_id for queued dagrun"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.drop_index('idx_dag_run_queued_dags')
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c27942b..1e5c2c1 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -107,6 +107,16 @@ class DagRun(Base, LoggingMixin):
             mssql_where=text("state='running'"),
             sqlite_where=text("state='running'"),
         ),
+        # since mysql lacks filtered/partial indices, this creates a
+        # duplicate index on mysql. Not the end of the world
+        Index(
+            'idx_dag_run_queued_dags',
+            'state',
+            'dag_id',
+            postgres_where=text("state='queued'"),
+            mssql_where=text("state='queued'"),
+            sqlite_where=text("state='queued'"),
+        ),
     )
 
     task_instances = relationship(
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index e049603..d43689c 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``092435bf5d12`` (head)        | ``97cdd93827b8`` | ``2.1.4``       | Add 
``max_active_runs`` column to ``dag_model`` table                               
  |
+| ``ccde3e26fe78`` (head)        | ``092435bf5d12`` | ``2.1.4``       | Add 
index on state, dag_id for queued ``dagrun``                                    
  |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``092435bf5d12``               | ``97cdd93827b8`` | ``2.1.4``       | Add 
``max_active_runs`` column to ``dag_model`` table                               
  |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``97cdd93827b8``               | ``a13f7613ad25`` | ``2.1.3``       | Add 
``queued_at`` column in ``dag_run`` table                                       
  |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 926a1fe..2364c80 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -799,21 +799,33 @@ class TestSchedulerJob(unittest.TestCase):
         assert 0 == 
len(self.scheduler_job._executable_task_instances_to_queued(max_tis=32, 
session=session))
         session.rollback()
 
-    def test_tis_for_queued_dagruns_are_not_run(self, dag_maker):
+    def test_tis_for_queued_dagruns_are_not_run(self):
         """
         This tests that tis from queued dagruns are not queued
         """
         dag_id = "test_tis_for_queued_dagruns_are_not_run"
         task_id_1 = 'dummy'
+        session = settings.Session()
 
-        with dag_maker(dag_id) as dag:
+        with DAG(dag_id=dag_id, start_date=DEFAULT_DATE) as dag:
             task1 = DummyOperator(task_id=task_id_1)
-        dr1 = dag_maker.create_dagrun(state=State.QUEUED)
-        dr2 = dag_maker.create_dagrun(
-            run_id='test2', 
execution_date=dag.following_schedule(dr1.execution_date)
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
+        dr1 = dag.create_dagrun(run_id='test', run_type=DagRunType.SCHEDULED, 
state=State.QUEUED)
+        dr2 = dag.create_dagrun(
+            run_id='test2',
+            execution_date=dag.following_schedule(dr1.execution_date),
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
         )
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        session = settings.Session()
+
         ti1 = TaskInstance(task1, dr1.execution_date)
         ti2 = TaskInstance(task1, dr2.execution_date)
         ti1.state = State.SCHEDULED
@@ -1579,6 +1591,41 @@ class TestSchedulerJob(unittest.TestCase):
         self.scheduler_job.executor.end.assert_called_once()
         mock_processor_agent.return_value.end.reset_mock(side_effect=True)
 
+    def test_theres_limit_to_queued_dagruns_in_a_dag(self):
+        """This tests that there's limit to the number of queued dagrun 
scheduler can create in a dag"""
+        with DAG(dag_id='test_theres_limit_to_queued_dagruns_in_a_dag', 
start_date=DEFAULT_DATE) as dag:
+            DummyOperator(task_id='mytask')
+
+        session = settings.Session()
+
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        self.scheduler_job.dagbag = dagbag
+
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        assert orm_dag is not None
+        for _ in range(20):
+            self.scheduler_job._create_dag_runs([orm_dag], session)
+        assert session.query(DagRun).count() == 16
+
+        with conf_vars({('core', 'max_queued_runs_per_dag'): '5'}):
+            clear_db_runs()
+            for i in range(20):
+                self.scheduler_job._create_dag_runs([orm_dag], session)
+        assert session.query(DagRun).count() == 5
+
     def test_dagrun_timeout_verify_max_active_runs(self):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs
@@ -3729,19 +3776,38 @@ class TestSchedulerJob(unittest.TestCase):
         # Assert that the other one is queued
         assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, 
session=session)) == 1
 
-    def 
test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self, 
dag_maker):
-        session = settings.Session()
-        with dag_maker('test_dag1', max_active_runs=1) as dag:
+    def 
test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self):
+
+        with DAG(
+            
dag_id='test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags',
+            start_date=DEFAULT_DATE,
+            max_active_runs=1,
+        ) as dag:
             DummyOperator(task_id='mytask')
+
+        session = settings.Session()
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
         date = dag.following_schedule(DEFAULT_DATE)
         for _ in range(30):
             dr = dag.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
 
         date = timezone.datetime(2020, 1, 1)
-        with dag_maker('test_dag2', start_date=date) as dag2:
+        with DAG(
+            
dag_id='test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags_2',
+            start_date=date,
+        ) as dag2:
             DummyOperator(task_id='mytask')
 
+        dagbag.bag_dag(dag=dag2, root_dag=dag2)
+        dagbag.sync_to_db(session=session)
         for _ in range(10):
             dr = dag2.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
@@ -3766,13 +3832,25 @@ class TestSchedulerJob(unittest.TestCase):
         )
         assert len(session.query(DagRun).filter(DagRun.state == 
State.RUNNING).all()) == 11
 
-    def test_start_queued_dagruns_do_follow_execution_date_order(self, 
dag_maker):
+    def test_start_queued_dagruns_do_follow_execution_date_order(self):
         session = settings.Session()
-        with dag_maker('test_dag1', max_active_runs=1) as dag:
+        with DAG(
+            dag_id='test_start_queued_dagruns_do_follow_execution_date_order',
+            start_date=DEFAULT_DATE,
+            max_active_runs=1,
+        ) as dag:
             DummyOperator(task_id='mytask')
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
         date = dag.following_schedule(DEFAULT_DATE)
         for i in range(30):
-            dr = dag_maker.create_dagrun(
+            dr = dag.create_dagrun(
                 run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, 
state=State.QUEUED, execution_date=date
             )
             date = dr.execution_date + timedelta(hours=1)
@@ -3782,7 +3860,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._start_queued_dagruns(session)
         session.flush()
-        dr = DagRun.find(run_id='dagrun_0')
+        dr = DagRun.find(dag_id=dag.dag_id, run_id='dagrun_0')
         ti = dr[0].get_task_instance(task_id='mytask', session=session)
         ti.state = State.SUCCESS
         session.merge(ti)
@@ -3799,7 +3877,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         assert dr[0].state == State.RUNNING
 
-    def test_no_dagruns_would_stuck_in_running(self, dag_maker):
+    def test_no_dagruns_would_stuck_in_running(self):
         # Test that running dagruns are not stuck in running.
         # Create one dagrun in 'running' state and 1 in 'queued' state from 
one dag(max_active_runs=1)
         # Create 16 dagruns in 'running' state and 16 in 'queued' state from 
another dag
@@ -3810,39 +3888,54 @@ class TestSchedulerJob(unittest.TestCase):
         session = settings.Session()
         # first dag and dagruns
         date = timezone.datetime(2016, 1, 1)
-        with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1, 
start_date=date) as dag:
+
+        with DAG(
+            dag_id='test_dagrun_states_are_correct_1',
+            start_date=date,
+            max_active_runs=1,
+        ) as dag:
             task1 = DummyOperator(task_id='dummy_task')
 
-        dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1', 
execution_date=date)
-        dag_maker.create_dagrun(
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        dagbag.sync_to_db(session=session)
+
+        dr1_running = dag.create_dagrun(run_id='dr1_run_1', 
execution_date=date, state=State.RUNNING)
+        dag.create_dagrun(
             run_id='dr1_run_2',
             state=State.QUEUED,
             execution_date=dag.following_schedule(dr1_running.execution_date),
         )
         # second dag and dagruns
         date = timezone.datetime(2020, 1, 1)
-        with dag_maker('test_dagrun_states_are_correct_2', start_date=date) as 
dag:
+
+        with DAG(dag_id='test_dagrun_states_are_correct_2', start_date=date) 
as dag2:
             DummyOperator(task_id='dummy_task')
         for i in range(16):
-            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', 
state=State.RUNNING, execution_date=date)
+            dr = dag2.create_dagrun(run_id=f'dr2_run_{i+1}', 
state=State.RUNNING, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
         dr16 = DagRun.find(run_id='dr2_run_16')
         date = dr16[0].execution_date + timedelta(hours=1)
         for i in range(16, 32):
-            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', 
state=State.QUEUED, execution_date=date)
+            dr = dag2.create_dagrun(run_id=f'dr2_run_{i+1}', 
state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
 
         # third dag and dagruns
         date = timezone.datetime(2021, 1, 1)
-        with dag_maker('test_dagrun_states_are_correct_3', start_date=date) as 
dag:
+        with DAG(dag_id='test_dagrun_states_are_correct_3', start_date=date) 
as dag3:
             DummyOperator(task_id='dummy_task')
         for i in range(16):
-            dr = dag_maker.create_dagrun(run_id=f'dr3_run_{i+1}', 
state=State.RUNNING, execution_date=date)
+            dr = dag3.create_dagrun(run_id=f'dr3_run_{i+1}', 
state=State.RUNNING, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
         dr16 = DagRun.find(run_id='dr3_run_16')
         date = dr16[0].execution_date + timedelta(hours=1)
         for i in range(16, 32):
-            dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', 
state=State.QUEUED, execution_date=date)
+            dr = dag3.create_dagrun(run_id=f'dr2_run_{i+1}', 
state=State.QUEUED, execution_date=date)
             date = dr.execution_date + timedelta(hours=1)
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -3862,276 +3955,6 @@ class TestSchedulerJob(unittest.TestCase):
         assert DagRun.find(run_id='dr1_run_1')[0].state == State.SUCCESS
         assert DagRun.find(run_id='dr1_run_2')[0].state == State.RUNNING
 
-    @pytest.mark.parametrize(
-        "state, start_date, end_date",
-        [
-            [State.NONE, None, None],
-            [
-                State.UP_FOR_RETRY,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-            [
-                State.UP_FOR_RESCHEDULE,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-        ],
-    )
-    def test_dag_file_processor_process_task_instances(self, state, 
start_date, end_date, dag_maker):
-        """
-        Test if _process_task_instances puts the right task instances into the
-        mock_list.
-        """
-        with dag_maker(dag_id='test_scheduler_process_execute_task'):
-            BashOperator(task_id='dummy', bash_command='echo hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-
-        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-        assert dr is not None
-
-        with create_session() as session:
-            ti = dr.get_task_instances(session=session)[0]
-            ti.state = state
-            ti.start_date = start_date
-            ti.end_date = end_date
-
-            self.scheduler_job._schedule_dag_run(dr, session)
-            assert 
session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
-
-            session.refresh(ti)
-            assert ti.state == State.SCHEDULED
-
-    @pytest.mark.parametrize(
-        "state,start_date,end_date",
-        [
-            [State.NONE, None, None],
-            [
-                State.UP_FOR_RETRY,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-            [
-                State.UP_FOR_RESCHEDULE,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-        ],
-    )
-    def 
test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag(
-        self, state, start_date, end_date, dag_maker
-    ):
-        """
-        Test if _process_task_instances puts the right task instances into the
-        mock_list.
-        """
-        with 
dag_maker(dag_id='test_scheduler_process_execute_task_with_max_active_tis_per_dag'):
-            BashOperator(task_id='dummy', max_active_tis_per_dag=2, 
bash_command='echo Hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-
-        dr = dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-        )
-        assert dr is not None
-
-        with create_session() as session:
-            ti = dr.get_task_instances(session=session)[0]
-            ti.state = state
-            ti.start_date = start_date
-            ti.end_date = end_date
-
-            self.scheduler_job._schedule_dag_run(dr, session)
-            assert 
session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
-
-            session.refresh(ti)
-            assert ti.state == State.SCHEDULED
-
-    @pytest.mark.parametrize(
-        "state, start_date, end_date",
-        [
-            [State.NONE, None, None],
-            [
-                State.UP_FOR_RETRY,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-            [
-                State.UP_FOR_RESCHEDULE,
-                timezone.utcnow() - datetime.timedelta(minutes=30),
-                timezone.utcnow() - datetime.timedelta(minutes=15),
-            ],
-        ],
-    )
-    def test_dag_file_processor_process_task_instances_depends_on_past(
-        self, state, start_date, end_date, dag_maker
-    ):
-        """
-        Test if _process_task_instances puts the right task instances into the
-        mock_list.
-        """
-        with dag_maker(
-            dag_id='test_scheduler_process_execute_task_depends_on_past',
-            default_args={
-                'depends_on_past': True,
-            },
-        ):
-            BashOperator(task_id='dummy1', bash_command='echo hi')
-            BashOperator(task_id='dummy2', bash_command='echo hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-        dr = dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-        )
-        assert dr is not None
-
-        with create_session() as session:
-            tis = dr.get_task_instances(session=session)
-            for ti in tis:
-                ti.state = state
-                ti.start_date = start_date
-                ti.end_date = end_date
-
-            self.scheduler_job._schedule_dag_run(dr, session)
-            assert 
session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2
-
-            session.refresh(tis[0])
-            session.refresh(tis[1])
-            assert tis[0].state == State.SCHEDULED
-            assert tis[1].state == State.SCHEDULED
-
-    def test_scheduler_job_add_new_task(self, dag_maker):
-        """
-        Test if a task instance will be added if the dag is updated
-        """
-        with dag_maker(dag_id='test_scheduler_add_new_task') as dag:
-            BashOperator(task_id='dummy', bash_command='echo test')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag = dag_maker.dagbag
-
-        session = settings.Session()
-        orm_dag = dag_maker.dag_model
-        assert orm_dag is not None
-
-        if self.scheduler_job.processor_agent:
-            self.scheduler_job.processor_agent.end()
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-        dag = self.scheduler_job.dagbag.get_dag('test_scheduler_add_new_task', 
session=session)
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
-        tis = dr.get_task_instances()
-        assert len(tis) == 1
-
-        BashOperator(task_id='dummy2', dag=dag, bash_command='echo test')
-        SerializedDagModel.write_dag(dag=dag)
-
-        self.scheduler_job._schedule_dag_run(dr, session)
-        assert 
session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2
-        session.flush()
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
-        tis = dr.get_task_instances()
-        assert len(tis) == 2
-
-    def test_runs_respected_after_clear(self, dag_maker):
-        """
-        Test dag after dag.clear, max_active_runs is respected
-        """
-        with dag_maker(
-            dag_id='test_scheduler_max_active_runs_respected_after_clear', 
max_active_runs=1
-        ) as dag:
-            BashOperator(task_id='dummy', bash_command='echo Hi')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-
-        session = settings.Session()
-        date = DEFAULT_DATE
-        dag_maker.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.QUEUED,
-        )
-        date = dag.following_schedule(date)
-        dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=date,
-            state=State.QUEUED,
-        )
-        date = dag.following_schedule(date)
-        dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=date,
-            state=State.QUEUED,
-        )
-        dag.clear()
-
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, 
session=session)) == 3
-
-        session = settings.Session()
-        self.scheduler_job._start_queued_dagruns(session)
-        session.flush()
-        # Assert that only 1 dagrun is active
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, 
session=session)) == 1
-        # Assert that the other two are queued
-        assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, 
session=session)) == 2
-
-    def test_timeout_triggers(self, dag_maker):
-        """
-        Tests that tasks in the deferred state, but whose trigger timeout
-        has expired, are correctly failed.
-
-        """
-
-        session = settings.Session()
-        # Create the test DAG and task
-        with dag_maker(
-            dag_id='test_timeout_triggers',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@once',
-            max_active_runs=1,
-            session=session,
-        ):
-            DummyOperator(task_id='dummy1')
-
-        # Create a Task Instance for the task that is allegedly deferred
-        # but past its timeout, and one that is still good.
-        # We don't actually need a linked trigger here; the code doesn't check.
-        dr1 = dag_maker.create_dagrun()
-        dr2 = dag_maker.create_dagrun(
-            run_id="test2", execution_date=DEFAULT_DATE + 
datetime.timedelta(seconds=1)
-        )
-        ti1 = dr1.get_task_instance('dummy1', session)
-        ti2 = dr2.get_task_instance('dummy1', session)
-        ti1.state = State.DEFERRED
-        ti1.trigger_timeout = timezone.utcnow() - 
datetime.timedelta(seconds=60)
-        ti2.state = State.DEFERRED
-        ti2.trigger_timeout = timezone.utcnow() + 
datetime.timedelta(seconds=60)
-        session.flush()
-
-        # Boot up the scheduler and make it check timeouts
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.check_trigger_timeouts(session=session)
-
-        # Make sure that TI1 is now scheduled to fail, and 2 wasn't touched
-        session.refresh(ti1)
-        session.refresh(ti2)
-        assert ti1.state == State.SCHEDULED
-        assert ti1.next_method == "__fail__"
-        assert ti2.state == State.DEFERRED
-
 
 @pytest.mark.xfail(reason="Work out where this goes")
 def test_task_with_upstream_skip_process_task_instances():

Reply via email to