ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r513375796



##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', 
python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', 
python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', 
python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', 
state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)

Review comment:
       ```suggestion
           task_instance_a.task = task_a
   ```

##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', 
python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', 
python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', 
python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', 
state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)
+
+        task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+        task_instance_b.task = dag.get_task(task_b.task_id)

Review comment:
       ```suggestion
           task_instance_b.task = task_b
   ```

##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', 
python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', 
python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', 
python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', 
state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)
+
+        task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+        task_instance_b.task = dag.get_task(task_b.task_id)
+
+        schedulable_tis, _ = dag_run.update_state()
+        self.assertEqual(len(schedulable_tis), 1)
+        self.assertEqual(schedulable_tis[0].task_id, task_a.task_id)
+        self.assertEqual(schedulable_tis[0].state, State.NONE)
+
+        dag_run.schedule_tis(schedulable_tis, session)
+        self.validate_ti_states(dag_run, {'A': State.SCHEDULED, 'B': 
State.NONE, 'C': State.NONE})
+
+        scheduler._critical_section_execute_task_instances(session=session)
+        self.validate_ti_states(dag_run, {'A': State.QUEUED, 'B': State.NONE, 
'C': State.NONE})

Review comment:
       (For this to work the TI would need to be attached to the session you 
would need to pass `session=session` to dag_run.get_task_instance)

##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
         dr.update_state()
         self.assertEqual(State.SUCCESS, dr.state)
 
+    def validate_ti_states(self, dag_run, ti_state_mapping):
+        for task_id, expected_state in ti_state_mapping.items():
+            task_instance = dag_run.get_task_instance(task_id=task_id)
+            self.assertEqual(task_instance.state, expected_state)
+
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_dagrun_fast_follow(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_fast_follow',
+            start_date=DEFAULT_DATE
+        )
+
+        dag_model = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=dag.start_date,
+            is_active=True,
+        )
+        session.add(dag_model)
+        session.flush()
+
+        python_callable = lambda: True
+        with dag:
+            task_a = PythonOperator(task_id='A', 
python_callable=python_callable)
+            task_b = PythonOperator(task_id='B', 
python_callable=python_callable)
+            task_c = PythonOperator(task_id='C', 
python_callable=python_callable)
+            task_a >> task_b
+            task_b >> task_c
+
+        scheduler = SchedulerJob()
+        scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+        dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow', 
state=State.RUNNING)
+
+        task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+        task_instance_a.task = dag.get_task(task_a.task_id)
+
+        task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+        task_instance_b.task = dag.get_task(task_b.task_id)
+
+        schedulable_tis, _ = dag_run.update_state()
+        self.assertEqual(len(schedulable_tis), 1)
+        self.assertEqual(schedulable_tis[0].task_id, task_a.task_id)
+        self.assertEqual(schedulable_tis[0].state, State.NONE)
+
+        dag_run.schedule_tis(schedulable_tis, session)
+        self.validate_ti_states(dag_run, {'A': State.SCHEDULED, 'B': 
State.NONE, 'C': State.NONE})
+
+        scheduler._critical_section_execute_task_instances(session=session)
+        self.validate_ti_states(dag_run, {'A': State.QUEUED, 'B': State.NONE, 
'C': State.NONE})

Review comment:
       Do we need this block? I think this test would be clearer if we instead 
just directly set task_instance_a to a runnable state: For example:
   
   
   ```python
           task_instance_a.state = State.QUEUED
           session.commit()
   ```
   
   My reason here is that this is the "pre-condition/setup" for the test, not 
part of what are actually testing here, so by having these asserts and calling 
the scheduler job code we are not testing this feature in isolation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to