This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 18fef9b Fix mini scheduler not respecting wait_for_downstream (#18310) 18fef9b is described below commit 18fef9bb13a2cbd8cc09484082648aec78610d52 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Fri Sep 17 15:07:18 2021 +0100 Fix mini scheduler not respecting wait_for_downstream (#18310) When wait_for_downstream is set on a task, mini scheduler doesn't respect it and goes ahead to schedule unrunnable task instances. This PR fixes it by checking the dependency in mini scheduler Co-authored-by: Kaxil Naik <kaxiln...@gmail.com> --- airflow/jobs/local_task_job.py | 12 ++++++++++++ tests/jobs/test_local_task_job.py | 41 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 6b53bd9..ed7c723 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -226,6 +226,18 @@ class LocalTaskJob(BaseJob): @Sentry.enrich_errors def _run_mini_scheduler_on_child_tasks(self, session=None) -> None: try: + + if ( + self.task_instance.task.wait_for_downstream + and self.task_instance.get_previous_ti() + and not self.task_instance.are_dependents_done() + ): + self.log.info( + "No downstream tasks scheduled because task instance " + "dependents have not completed yet and wait_for_downstream is true" + ) + return + # Re-select the row with a lock dag_run = with_row_locks( session.query(DagRun).filter_by( diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 75124db..487b98b 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -707,6 +707,47 @@ class TestLocalTaskJob: if scheduler_job.processor_agent: scheduler_job.processor_agent.end() + @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'}) + def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, dag_maker): + session = settings.Session() + with dag_maker(default_args={'wait_for_downstream': True}, catchup=False) as dag: + task_a = PythonOperator(task_id='A', python_callable=lambda: True) + task_b = PythonOperator(task_id='B', python_callable=lambda: True) + task_c = PythonOperator(task_id='C', python_callable=lambda: True) + task_a >> task_b >> task_c + + scheduler_job = SchedulerJob(subdir=os.devnull) + scheduler_job.dagbag.bag_dag(dag, root_dag=dag) + + dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, execution_date=DEFAULT_DATE) + dr2 = dag.create_dagrun( + run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1) + ) + ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS) + ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS) + ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING) + ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE) + ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE) + ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE) + session.merge(ti_a) + session.merge(ti_b) + session.merge(ti_c) + session.merge(ti2_a) + session.merge(ti2_b) + session.merge(ti2_c) + session.flush() + + job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, executor=SequentialExecutor()) + job1.task_runner = StandardTaskRunner(job1) + job1.run() + ti2_a.refresh_from_db() + assert ti2_a.state == State.SUCCESS + assert ti2_b.state == State.NONE + assert ( + "No downstream tasks scheduled because task instance " + "dependents have not completed yet and wait_for_downstream is true" + ) in caplog.text + @patch('airflow.utils.process_utils.subprocess.check_call') def test_task_sigkill_works_with_retries(self, _check_call, caplog, dag_maker): """