This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new a15792dd42 Avoid considering EmptyOperator in mini scheduler (#29979) a15792dd42 is described below commit a15792dd4216a1ae8c83c8c18ab255d2c558636c Author: eladkal <45845474+elad...@users.noreply.github.com> AuthorDate: Thu Mar 9 00:28:08 2023 +0200 Avoid considering EmptyOperator in mini scheduler (#29979) * Avoid considering EmptyOperator in mini scheduler EmptyOperator should not be executed on workers thus it should not be considered for the mini scheduler optimization. closes: https://github.com/apache/airflow/issues/29974 --- airflow/models/taskinstance.py | 12 ++++++++++- tests/models/test_taskinstance.py | 44 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 3f6ebefd95..1a826bc918 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2651,7 +2651,17 @@ class TaskInstance(Base, LoggingMixin): task_id for task_id in partial_dag.task_ids if task_id not in task.downstream_task_ids } - schedulable_tis = [ti for ti in info.schedulable_tis if ti.task_id not in skippable_task_ids] + schedulable_tis = [ + ti + for ti in info.schedulable_tis + if ti.task_id not in skippable_task_ids + and not ( + ti.task.inherits_from_empty_operator + and not ti.task.on_execute_callback + and not ti.task.on_success_callback + and not ti.task.outlets + ) + ] for schedulable_ti in schedulable_tis: if not hasattr(schedulable_ti, "task"): schedulable_ti.task = task.dag.get_task(schedulable_ti.task_id) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 54884f633e..50cac05296 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -3859,6 +3859,50 @@ def test_mapped_task_does_not_error_in_mini_scheduler_if_upstreams_are_not_done( assert "0 downstream tasks scheduled from follow-on schedule" in caplog.text +def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, session): + """ + This tests verify that operators with inherits_from_empty_operator are not considered by mini scheduler. + Such operators should not run on workers thus the mini scheduler optimization should skip them and not + submit them directly to worker. + """ + with dag_maker() as dag: + + @dag.task + def first_task(): + print(2) + + @dag.task + def second_task(): + print(2) + + third_task = EmptyOperator(task_id="third_task") + forth_task = EmptyOperator(task_id="forth_task", on_success_callback=lambda x: print("hi")) + + first_task() >> [second_task(), third_task, forth_task] + dag_run = dag_maker.create_dagrun() + first_ti = dag_run.get_task_instance(task_id="first_task") + second_ti = dag_run.get_task_instance(task_id="second_task") + third_ti = dag_run.get_task_instance(task_id="third_task") + forth_ti = dag_run.get_task_instance(task_id="forth_task") + first_ti.state = State.SUCCESS + second_ti.state = State.NONE + third_ti.state = State.NONE + forth_ti.state = State.NONE + session.merge(first_ti) + session.merge(second_ti) + session.merge(third_ti) + session.merge(forth_ti) + session.commit() + first_ti.schedule_downstream_tasks(session=session) + second_task = dag_run.get_task_instance(task_id="second_task") + third_task = dag_run.get_task_instance(task_id="third_task") + forth_task = dag_run.get_task_instance(task_id="forth_task") + assert second_task.state == State.SCHEDULED + assert third_task.state == State.NONE + assert forth_task.state == State.SCHEDULED + assert "2 downstream tasks scheduled from follow-on schedule" in caplog.text + + def test_mapped_task_expands_in_mini_scheduler_if_upstreams_are_done(dag_maker, caplog, session): """Test that mini scheduler expands mapped task""" with dag_maker() as dag: