ashb commented on a change in pull request #11589:
URL: https://github.com/apache/airflow/pull/11589#discussion_r513375796
##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
dr.update_state()
self.assertEqual(State.SUCCESS, dr.state)
+ def validate_ti_states(self, dag_run, ti_state_mapping):
+ for task_id, expected_state in ti_state_mapping.items():
+ task_instance = dag_run.get_task_instance(task_id=task_id)
+ self.assertEqual(task_instance.state, expected_state)
+
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_dagrun_fast_follow(self):
+ session = settings.Session()
+
+ dag = DAG(
+ 'test_dagrun_fast_follow',
+ start_date=DEFAULT_DATE
+ )
+
+ dag_model = DagModel(
+ dag_id=dag.dag_id,
+ next_dagrun=dag.start_date,
+ is_active=True,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ python_callable = lambda: True
+ with dag:
+ task_a = PythonOperator(task_id='A',
python_callable=python_callable)
+ task_b = PythonOperator(task_id='B',
python_callable=python_callable)
+ task_c = PythonOperator(task_id='C',
python_callable=python_callable)
+ task_a >> task_b
+ task_b >> task_c
+
+ scheduler = SchedulerJob()
+ scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+ dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow',
state=State.RUNNING)
+
+ task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+ task_instance_a.task = dag.get_task(task_a.task_id)
Review comment:
```suggestion
task_instance_a.task = task_a
```
##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
dr.update_state()
self.assertEqual(State.SUCCESS, dr.state)
+ def validate_ti_states(self, dag_run, ti_state_mapping):
+ for task_id, expected_state in ti_state_mapping.items():
+ task_instance = dag_run.get_task_instance(task_id=task_id)
+ self.assertEqual(task_instance.state, expected_state)
+
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_dagrun_fast_follow(self):
+ session = settings.Session()
+
+ dag = DAG(
+ 'test_dagrun_fast_follow',
+ start_date=DEFAULT_DATE
+ )
+
+ dag_model = DagModel(
+ dag_id=dag.dag_id,
+ next_dagrun=dag.start_date,
+ is_active=True,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ python_callable = lambda: True
+ with dag:
+ task_a = PythonOperator(task_id='A',
python_callable=python_callable)
+ task_b = PythonOperator(task_id='B',
python_callable=python_callable)
+ task_c = PythonOperator(task_id='C',
python_callable=python_callable)
+ task_a >> task_b
+ task_b >> task_c
+
+ scheduler = SchedulerJob()
+ scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+ dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow',
state=State.RUNNING)
+
+ task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+ task_instance_a.task = dag.get_task(task_a.task_id)
+
+ task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+ task_instance_b.task = dag.get_task(task_b.task_id)
Review comment:
```suggestion
task_instance_b.task = task_b
```
##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
dr.update_state()
self.assertEqual(State.SUCCESS, dr.state)
+ def validate_ti_states(self, dag_run, ti_state_mapping):
+ for task_id, expected_state in ti_state_mapping.items():
+ task_instance = dag_run.get_task_instance(task_id=task_id)
+ self.assertEqual(task_instance.state, expected_state)
+
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_dagrun_fast_follow(self):
+ session = settings.Session()
+
+ dag = DAG(
+ 'test_dagrun_fast_follow',
+ start_date=DEFAULT_DATE
+ )
+
+ dag_model = DagModel(
+ dag_id=dag.dag_id,
+ next_dagrun=dag.start_date,
+ is_active=True,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ python_callable = lambda: True
+ with dag:
+ task_a = PythonOperator(task_id='A',
python_callable=python_callable)
+ task_b = PythonOperator(task_id='B',
python_callable=python_callable)
+ task_c = PythonOperator(task_id='C',
python_callable=python_callable)
+ task_a >> task_b
+ task_b >> task_c
+
+ scheduler = SchedulerJob()
+ scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+ dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow',
state=State.RUNNING)
+
+ task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+ task_instance_a.task = dag.get_task(task_a.task_id)
+
+ task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+ task_instance_b.task = dag.get_task(task_b.task_id)
+
+ schedulable_tis, _ = dag_run.update_state()
+ self.assertEqual(len(schedulable_tis), 1)
+ self.assertEqual(schedulable_tis[0].task_id, task_a.task_id)
+ self.assertEqual(schedulable_tis[0].state, State.NONE)
+
+ dag_run.schedule_tis(schedulable_tis, session)
+ self.validate_ti_states(dag_run, {'A': State.SCHEDULED, 'B':
State.NONE, 'C': State.NONE})
+
+ scheduler._critical_section_execute_task_instances(session=session)
+ self.validate_ti_states(dag_run, {'A': State.QUEUED, 'B': State.NONE,
'C': State.NONE})
Review comment:
(For this to work the TI would need to be attached to the session you
would need to pass `session=session` to dag_run.get_task_instance)
##########
File path: tests/models/test_dagrun.py
##########
@@ -215,6 +218,110 @@ def test_dagrun_success_conditions(self):
dr.update_state()
self.assertEqual(State.SUCCESS, dr.state)
+ def validate_ti_states(self, dag_run, ti_state_mapping):
+ for task_id, expected_state in ti_state_mapping.items():
+ task_instance = dag_run.get_task_instance(task_id=task_id)
+ self.assertEqual(task_instance.state, expected_state)
+
+ @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+ def test_dagrun_fast_follow(self):
+ session = settings.Session()
+
+ dag = DAG(
+ 'test_dagrun_fast_follow',
+ start_date=DEFAULT_DATE
+ )
+
+ dag_model = DagModel(
+ dag_id=dag.dag_id,
+ next_dagrun=dag.start_date,
+ is_active=True,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ python_callable = lambda: True
+ with dag:
+ task_a = PythonOperator(task_id='A',
python_callable=python_callable)
+ task_b = PythonOperator(task_id='B',
python_callable=python_callable)
+ task_c = PythonOperator(task_id='C',
python_callable=python_callable)
+ task_a >> task_b
+ task_b >> task_c
+
+ scheduler = SchedulerJob()
+ scheduler.dagbag.bag_dag(dag, root_dag=dag)
+
+ dag_run = dag.create_dagrun(run_id='test_dagrun_fast_follow',
state=State.RUNNING)
+
+ task_instance_a = dag_run.get_task_instance(task_id=task_a.task_id)
+ task_instance_a.task = dag.get_task(task_a.task_id)
+
+ task_instance_b = dag_run.get_task_instance(task_id=task_b.task_id)
+ task_instance_b.task = dag.get_task(task_b.task_id)
+
+ schedulable_tis, _ = dag_run.update_state()
+ self.assertEqual(len(schedulable_tis), 1)
+ self.assertEqual(schedulable_tis[0].task_id, task_a.task_id)
+ self.assertEqual(schedulable_tis[0].state, State.NONE)
+
+ dag_run.schedule_tis(schedulable_tis, session)
+ self.validate_ti_states(dag_run, {'A': State.SCHEDULED, 'B':
State.NONE, 'C': State.NONE})
+
+ scheduler._critical_section_execute_task_instances(session=session)
+ self.validate_ti_states(dag_run, {'A': State.QUEUED, 'B': State.NONE,
'C': State.NONE})
Review comment:
Do we need this block? I think this test would be clearer if we instead
just directly set task_instance_a to a runnable state: For example:
```python
task_instance_a.state = State.QUEUED
session.commit()
```
My reason here is that this is the "pre-condition/setup" for the test, not
part of what are actually testing here, so by having these asserts and calling
the scheduler job code we are not testing this feature in isolation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]