This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d41c859cf33 Remove schedule downstream tasks after execution (aka 
"mini scheduler") (#43741)
d41c859cf33 is described below

commit d41c859cf3391d5a918552e96542bccc5c3e2bef
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Nov 6 15:29:50 2024 +0000

    Remove schedule downstream tasks after execution (aka "mini scheduler") 
(#43741)
    
    This has been questionable how much benefit it actually had, but with the 
move
    towards task DB isolation in Airflow 3 we won't be able to keep this anymore
    (as we didn't when AIP-44 DB isolation was enabled), so lets remove it now.
---
 airflow/config_templates/config.yml                |   9 -
 airflow/jobs/local_task_job_runner.py              |   6 +-
 airflow/models/taskinstance.py                     |  94 --------
 airflow/serialization/pydantic/taskinstance.py     |  10 -
 .../administration-and-deployment/scheduler.rst    |   5 -
 .../src/airflow/providers/edge/cli/edge_command.py |   2 -
 .../cncf/kubernetes/decorators/test_kubernetes.py  |  25 --
 tests/jobs/test_local_task_job.py                  | 158 ------------
 tests/models/test_taskinstance.py                  | 264 +--------------------
 9 files changed, 2 insertions(+), 571 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index cfadcb16fd2..837fedfac2b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2418,15 +2418,6 @@ scheduler:
       type: integer
       default: "20"
       see_also: ":ref:`scheduler:ha:tunables`"
-    schedule_after_task_execution:
-      description: |
-        Should the Task supervisor process perform a "mini scheduler" to 
attempt to schedule more tasks of the
-        same DAG. Leaving this on will mean tasks in the same DAG execute 
quicker, but might starve out other
-        dags in some circumstances
-      example: ~
-      version_added: 2.0.0
-      type: boolean
-      default: "True"
     parsing_pre_import_modules:
       description: |
         The scheduler reads dag files to extract the airflow modules that are 
going to be used,
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index 599493ea58c..5d1f15fe8f8 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -253,7 +253,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
         self.terminating = True
         self._log_return_code_metric(return_code)
 
-        if is_deferral := return_code == TaskReturnCode.DEFERRED.value:
+        if return_code == TaskReturnCode.DEFERRED.value:
             self.log.info("Task exited with return code %s (task deferral)", 
return_code)
             _set_task_deferred_context_var()
         else:
@@ -262,10 +262,6 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
                 message += ". For more information, see 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#LocalTaskJob-killed";
             self.log.info(message)
 
-        if not (self.task_instance.test_mode or is_deferral):
-            if conf.getboolean("scheduler", "schedule_after_task_execution", 
fallback=True):
-                
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
-
     def on_kill(self):
         self.task_runner.terminate()
         self.task_runner.on_finish()
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c525a40a14a..1ca90e7f1c4 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3639,100 +3639,6 @@ class TaskInstance(Base, LoggingMixin):
             return filters[0]
         return or_(*filters)
 
-    @classmethod
-    @provide_session
-    def _schedule_downstream_tasks(
-        cls,
-        ti: TaskInstance | TaskInstancePydantic,
-        session: Session = NEW_SESSION,
-        max_tis_per_query: int | None = None,
-    ):
-        from sqlalchemy.exc import OperationalError
-
-        from airflow.models.dagrun import DagRun
-
-        try:
-            # Re-select the row with a lock
-            dag_run = with_row_locks(
-                session.query(DagRun).filter_by(
-                    dag_id=ti.dag_id,
-                    run_id=ti.run_id,
-                ),
-                session=session,
-                skip_locked=True,
-            ).one_or_none()
-
-            if not dag_run:
-                cls.logger().debug("Skip locked rows, rollback")
-                session.rollback()
-                return
-
-            task = ti.task
-            if TYPE_CHECKING:
-                assert task
-                assert task.dag
-
-            # Previously, this section used task.dag.partial_subset to 
retrieve a partial DAG.
-            # However, this approach is unsafe as it can result in incomplete 
or incorrect task execution,
-            # leading to potential bad cases. As a result, the operation has 
been removed.
-            # For more details, refer to the discussion in PR 
#[https://github.com/apache/airflow/pull/42582].
-            dag_run.dag = task.dag
-            info = dag_run.task_instance_scheduling_decisions(session)
-
-            skippable_task_ids = {
-                task_id for task_id in task.dag.task_ids if task_id not in 
task.downstream_task_ids
-            }
-
-            schedulable_tis = [
-                ti
-                for ti in info.schedulable_tis
-                if ti.task_id not in skippable_task_ids
-                and not (
-                    ti.task.inherits_from_empty_operator
-                    and not ti.task.on_execute_callback
-                    and not ti.task.on_success_callback
-                    and not ti.task.outlets
-                )
-            ]
-            for schedulable_ti in schedulable_tis:
-                if getattr(schedulable_ti, "task", None) is None:
-                    schedulable_ti.task = 
task.dag.get_task(schedulable_ti.task_id)
-
-            num = dag_run.schedule_tis(schedulable_tis, session=session, 
max_tis_per_query=max_tis_per_query)
-            cls.logger().info("%d downstream tasks scheduled from follow-on 
schedule check", num)
-
-            session.flush()
-
-        except OperationalError as e:
-            # Any kind of DB error here is _non fatal_ as this block is just 
an optimisation.
-            cls.logger().warning(
-                "Skipping mini scheduling run due to exception: %s",
-                e.statement,
-                exc_info=True,
-            )
-            session.rollback()
-
-    @provide_session
-    def schedule_downstream_tasks(self, session: Session = NEW_SESSION, 
max_tis_per_query: int | None = None):
-        """
-        Schedule downstream tasks of this task instance.
-
-        :meta: private
-        """
-        try:
-            return TaskInstance._schedule_downstream_tasks(
-                ti=self, session=session, max_tis_per_query=max_tis_per_query
-            )
-        except Exception:
-            self.log.exception(
-                "Error scheduling downstream tasks. Skipping it as this is 
entirely optional optimisation. "
-                "There might be various reasons for it, please take a look at 
the stack trace to figure "
-                "out if the root cause can be diagnosed and fixed. See the 
issue "
-                "https://github.com/apache/airflow/issues/39717 for details 
and an example problem. If you "
-                "would like to get help in solving root cause, open discussion 
with all details with your "
-                "managed service support or in Airflow repository."
-            )
-
     def get_relevant_upstream_map_indexes(
         self,
         upstream: Operator,
diff --git a/airflow/serialization/pydantic/taskinstance.py 
b/airflow/serialization/pydantic/taskinstance.py
index d5573922b83..b3b60383ea6 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -467,16 +467,6 @@ class TaskInstancePydantic(BaseModelPydantic, 
LoggingMixin):
             session=session,
         )
 
-    def schedule_downstream_tasks(self, session: Session | None = None, 
max_tis_per_query: int | None = None):
-        """
-        Schedule downstream tasks of this task instance.
-
-        :meta: private
-        """
-        # we should not schedule downstream tasks with Pydantic model because 
it will not be able to
-        # get the DAG object (we do not serialize it currently).
-        return
-
     def command_as_list(
         self,
         mark_success: bool = False,
diff --git a/docs/apache-airflow/administration-and-deployment/scheduler.rst 
b/docs/apache-airflow/administration-and-deployment/scheduler.rst
index e2697408199..476e2e66279 100644
--- a/docs/apache-airflow/administration-and-deployment/scheduler.rst
+++ b/docs/apache-airflow/administration-and-deployment/scheduler.rst
@@ -382,8 +382,3 @@ However, you can also look at other non-performance-related 
scheduler configurat
   in the loop. i.e. if it scheduled something then it will start the next loop
   iteration straight away. This parameter is badly named (historical reasons) 
and it will be
   renamed in the future with deprecation of the current name.
-
-- :ref:`config:scheduler__schedule_after_task_execution`
-  Should the Task supervisor process perform a "mini scheduler" to attempt to 
schedule more tasks of
-  the same DAG. Leaving this on will mean tasks in the same DAG execute 
quicker,
-  but might starve out other DAGs in some circumstances.
diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py 
b/providers/src/airflow/providers/edge/cli/edge_command.py
index 487b3adde70..69f12acc3fa 100644
--- a/providers/src/airflow/providers/edge/cli/edge_command.py
+++ b/providers/src/airflow/providers/edge/cli/edge_command.py
@@ -77,8 +77,6 @@ def force_use_internal_api_on_edge_worker():
         os.environ["AIRFLOW_ENABLE_AIP_44"] = "True"
         os.environ["AIRFLOW__CORE__INTERNAL_API_URL"] = api_url
         InternalApiConfig.set_use_internal_api("edge-worker")
-        # Disable mini-scheduler post task execution and leave next task 
schedule to core scheduler
-        os.environ["AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION"] = 
"False"
 
 
 force_use_internal_api_on_edge_worker()
diff --git a/providers/tests/cncf/kubernetes/decorators/test_kubernetes.py 
b/providers/tests/cncf/kubernetes/decorators/test_kubernetes.py
index 9545e9adbf3..8f412c81b7f 100644
--- a/providers/tests/cncf/kubernetes/decorators/test_kubernetes.py
+++ b/providers/tests/cncf/kubernetes/decorators/test_kubernetes.py
@@ -215,28 +215,3 @@ def test_kubernetes_with_marked_as_teardown(
     assert len(dag.task_group.children) == 1
     teardown_task = dag.task_group.children["f"]
     assert teardown_task.is_teardown
-
-
-# Database isolation mode does not support mini-scheduler
[email protected]_if_database_isolation_mode
-def test_kubernetes_with_mini_scheduler(
-    dag_maker, session, mock_create_pod: mock.Mock, mock_hook: mock.Mock
-) -> None:
-    with dag_maker(session=session):
-
-        @task.kubernetes(
-            image="python:3.10-slim-buster",
-            in_cluster=False,
-            cluster_context="default",
-            config_file="/tmp/fake_file",
-        )
-        def f(arg1, arg2, kwarg1=None, kwarg2=None):
-            return {"key1": "value1", "key2": "value2"}
-
-        f1 = f.override(task_id="my_task_id", do_xcom_push=True)("arg1", 
"arg2", kwarg1="kwarg1")
-        f.override(task_id="my_task_id2", do_xcom_push=False)("arg1", "arg2", 
kwarg1=f1)
-
-    dr = dag_maker.create_dagrun()
-    (ti, _) = dr.task_instances
-    # check that mini-scheduler works
-    ti.schedule_downstream_tasks()
diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 7ee03747883..936f773e55f 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -37,11 +37,9 @@ from airflow.exceptions import AirflowException, 
AirflowTaskTimeout
 from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.jobs.job import Job, run_job
 from airflow.jobs.local_task_job_runner import SIGSEGV_MESSAGE, 
LocalTaskJobRunner
-from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.dag import DAG
 from airflow.models.dagbag import DagBag
-from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
@@ -845,162 +843,6 @@ class TestLocalTaskJob:
                 lines = f.readlines()
             assert len(lines) == 0
 
-    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
-    @pytest.mark.parametrize(
-        "conf, init_state, first_run_state, second_run_state, task_ids_to_run, 
error_message",
-        [
-            (
-                {("scheduler", "schedule_after_task_execution"): "True"},
-                {"A": State.QUEUED, "B": State.NONE, "C": State.NONE},
-                {"A": State.SUCCESS, "B": State.SCHEDULED, "C": State.NONE},
-                {"A": State.SUCCESS, "B": State.SUCCESS, "C": State.SCHEDULED},
-                ["A", "B"],
-                "A -> B -> C, with fast-follow ON when A runs, B should be 
QUEUED. Same for B and C.",
-            ),
-            (
-                {("scheduler", "schedule_after_task_execution"): "False"},
-                {"A": State.QUEUED, "B": State.NONE, "C": State.NONE},
-                {"A": State.SUCCESS, "B": State.NONE, "C": State.NONE},
-                None,
-                ["A", "B"],
-                "A -> B -> C, with fast-follow OFF, when A runs, B shouldn't 
be QUEUED.",
-            ),
-            (
-                {("scheduler", "schedule_after_task_execution"): "True"},
-                {"D": State.QUEUED, "E": State.NONE, "F": State.NONE, "G": 
State.NONE},
-                {"D": State.SUCCESS, "E": State.NONE, "F": State.NONE, "G": 
State.NONE},
-                None,
-                ["D", "E"],
-                "G -> F -> E & D -> E, when D runs but F isn't QUEUED yet, E 
shouldn't be QUEUED.",
-            ),
-            (
-                {("scheduler", "schedule_after_task_execution"): "True"},
-                {"H": State.QUEUED, "I": State.FAILED, "J": State.NONE},
-                {"H": State.SUCCESS, "I": State.FAILED, "J": 
State.UPSTREAM_FAILED},
-                None,
-                ["H", "I"],
-                "H -> J & I -> J, when H is QUEUED but I has FAILED, J is 
marked UPSTREAM_FAILED.",
-            ),
-        ],
-    )
-    def test_fast_follow(
-        self,
-        conf,
-        init_state,
-        first_run_state,
-        second_run_state,
-        task_ids_to_run,
-        error_message,
-        get_test_dag,
-    ):
-        with conf_vars(conf):
-            dag = get_test_dag(
-                "test_dagrun_fast_follow",
-            )
-
-            scheduler_job = Job()
-            scheduler_job_runner = SchedulerJobRunner(job=scheduler_job, 
subdir=os.devnull)
-            scheduler_job_runner.dagbag.bag_dag(dag)
-            triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} 
if AIRFLOW_V_3_0_PLUS else {}
-
-            dag_run = dag.create_dagrun(
-                run_id="test_dagrun_fast_follow", state=State.RUNNING, 
**triggered_by_kwargs
-            )
-
-            ti_by_task_id = {}
-            with create_session() as session:
-                for task_id in init_state:
-                    ti = TaskInstance(dag.get_task(task_id), 
run_id=dag_run.run_id)
-                    ti.refresh_from_db()
-                    ti.state = init_state[task_id]
-                    session.merge(ti)
-                    ti_by_task_id[task_id] = ti
-
-            ti = TaskInstance(task=dag.get_task(task_ids_to_run[0]), 
run_id=dag_run.run_id)
-            ti.refresh_from_db()
-            job1 = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
-            job_runner = LocalTaskJobRunner(job=job1, task_instance=ti, 
ignore_ti_state=True)
-            job1.task_runner = StandardTaskRunner(job_runner)
-
-            run_job(job=job1, execute_callable=job_runner._execute)
-            self.validate_ti_states(dag_run, first_run_state, error_message)
-            if second_run_state:
-                ti = TaskInstance(task=dag.get_task(task_ids_to_run[1]), 
run_id=dag_run.run_id)
-                ti.refresh_from_db()
-                job2 = Job(dag_id=ti.dag_id, executor=SequentialExecutor())
-                job_runner = LocalTaskJobRunner(job=job2, task_instance=ti, 
ignore_ti_state=True)
-                job2.task_runner = StandardTaskRunner(job_runner)
-                run_job(job2, execute_callable=job_runner._execute)
-                self.validate_ti_states(dag_run, second_run_state, 
error_message)
-            if scheduler_job_runner.processor_agent:
-                scheduler_job_runner.processor_agent.end()
-
-    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
-    @conf_vars({("scheduler", "schedule_after_task_execution"): "True"})
-    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, 
get_test_dag):
-        dag = get_test_dag("test_dagrun_fast_follow")
-        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-        dag.catchup = False
-        SerializedDagModel.write_dag(dag)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-
-        dr = dag.create_dagrun(
-            run_id="test_1",
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            data_interval=data_interval,
-            **triggered_by_kwargs,
-        )
-        dr2 = dag.create_dagrun(
-            run_id="test_2",
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
-            data_interval=data_interval,
-            **triggered_by_kwargs,
-        )
-        task_k = dag.get_task("K")
-        task_l = dag.get_task("L")
-        with create_session() as session:
-            ti_k = dr.get_task_instance(task_k.task_id, session=session)
-            ti_k.refresh_from_task(task_k)
-            ti_k.state = State.SUCCESS
-
-            ti_b = dr.get_task_instance(task_l.task_id, session=session)
-            ti_b.refresh_from_task(task_l)
-            ti_b.state = State.SUCCESS
-
-            ti2_k = dr2.get_task_instance(task_k.task_id, session=session)
-            ti2_k.refresh_from_task(task_k)
-            ti2_k.state = State.NONE
-
-            ti2_l = dr2.get_task_instance(task_l.task_id, session=session)
-            ti2_l.refresh_from_task(task_l)
-            ti2_l.state = State.NONE
-
-            session.merge(ti_k)
-            session.merge(ti_b)
-
-            session.merge(ti2_k)
-            session.merge(ti2_l)
-
-        job1 = Job(
-            executor=SequentialExecutor(),
-            dag_id=ti2_k.dag_id,
-        )
-        job_runner = LocalTaskJobRunner(job=job1, task_instance=ti2_k, 
ignore_ti_state=True)
-        job1.task_runner = StandardTaskRunner(job_runner)
-        run_job(job=job1, execute_callable=job_runner._execute)
-
-        ti2_k.refresh_from_db()
-        ti2_l.refresh_from_db()
-        assert ti2_k.state == State.SUCCESS
-        assert ti2_l.state == State.NONE
-
-        failed_deps = list(ti2_l.get_failed_dep_statuses())
-        assert len(failed_deps) == 1
-        assert failed_deps[0].dep_name == "Previous Dagrun State"
-        assert not failed_deps[0].passed
-
     def test_process_sigsegv_error_message(self, caplog, dag_maker):
         """Test that shows error if process failed with segmentation fault."""
         caplog.set_level(logging.CRITICAL, logger="local_task_job.py")
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 8a1df0594e4..3a99ac39226 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -19,7 +19,6 @@ from __future__ import annotations
 
 import contextlib
 import datetime
-import logging
 import operator
 import os
 import pathlib
@@ -78,7 +77,7 @@ from airflow.models.xcom import LazyXComSelectSequence, XCom
 from airflow.notifications.basenotifier import BaseNotifier
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.bash import BashOperator
-from airflow.providers.standard.operators.python import BranchPythonOperator, 
PythonOperator
+from airflow.providers.standard.operators.python import PythonOperator
 from airflow.providers.standard.sensors.python import PythonSensor
 from airflow.sensors.base import BaseSensorOperator
 from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
@@ -5121,252 +5120,6 @@ def test_expand_non_templated_field(dag_maker, session):
     assert "get_extra_env" in echo_task.upstream_task_ids
 
 
[email protected]_if_database_isolation_mode  # Does not work in db isolation 
mode
-def 
test_mapped_task_does_not_error_in_mini_scheduler_if_upstreams_are_not_done(dag_maker,
 caplog, session):
-    """
-    This tests that when scheduling child tasks of a task and there's a mapped 
downstream task,
-    if the mapped downstream task has upstreams that are not yet done, the 
mapped downstream task is
-    not marked as `upstream_failed'
-    """
-    with dag_maker() as dag:
-
-        @dag.task
-        def second_task():
-            return [0, 1, 2]
-
-        @dag.task
-        def first_task():
-            print(2)
-
-        @dag.task
-        def middle_task(id):
-            return id
-
-        middle = middle_task.expand(id=second_task())
-
-        @dag.task
-        def last_task():
-            print(3)
-
-        [first_task(), middle] >> last_task()
-
-    dag_run = dag_maker.create_dagrun()
-    first_ti = dag_run.get_task_instance(task_id="first_task")
-    second_ti = dag_run.get_task_instance(task_id="second_task")
-    first_ti.state = State.SUCCESS
-    second_ti.state = State.RUNNING
-    session.merge(first_ti)
-    session.merge(second_ti)
-    session.commit()
-    first_ti.schedule_downstream_tasks(session=session)
-    middle_ti = dag_run.get_task_instance(task_id="middle_task")
-    assert middle_ti.state != State.UPSTREAM_FAILED
-    assert "0 downstream tasks scheduled from follow-on schedule" in 
caplog.text
-
-
[email protected]_if_database_isolation_mode  # Does not work in db isolation 
mode
-def test_empty_operator_is_not_considered_in_mini_scheduler(dag_maker, caplog, 
session):
-    """
-    This tests verify that operators with inherits_from_empty_operator are not 
considered by mini scheduler.
-    Such operators should not run on workers thus the mini scheduler 
optimization should skip them and not
-    submit them directly to worker.
-    """
-    with dag_maker() as dag:
-
-        @dag.task
-        def first_task():
-            print(2)
-
-        @dag.task
-        def second_task():
-            print(2)
-
-        third_task = EmptyOperator(task_id="third_task")
-        forth_task = EmptyOperator(task_id="forth_task", 
on_success_callback=lambda x: print("hi"))
-
-        first_task() >> [second_task(), third_task, forth_task]
-        dag_run = dag_maker.create_dagrun()
-        first_ti = dag_run.get_task_instance(task_id="first_task")
-        second_ti = dag_run.get_task_instance(task_id="second_task")
-        third_ti = dag_run.get_task_instance(task_id="third_task")
-        forth_ti = dag_run.get_task_instance(task_id="forth_task")
-        first_ti.state = State.SUCCESS
-        second_ti.state = State.NONE
-        third_ti.state = State.NONE
-        forth_ti.state = State.NONE
-        session.merge(first_ti)
-        session.merge(second_ti)
-        session.merge(third_ti)
-        session.merge(forth_ti)
-        session.commit()
-        first_ti.schedule_downstream_tasks(session=session)
-        second_task = dag_run.get_task_instance(task_id="second_task")
-        third_task = dag_run.get_task_instance(task_id="third_task")
-        forth_task = dag_run.get_task_instance(task_id="forth_task")
-        assert second_task.state == State.SCHEDULED
-        assert third_task.state == State.NONE
-        assert forth_task.state == State.SCHEDULED
-        assert "2 downstream tasks scheduled from follow-on schedule" in 
caplog.text
-
-
[email protected]_if_database_isolation_mode  # Does not work in db isolation 
mode
-def 
test_mapped_task_expands_in_mini_scheduler_if_upstreams_are_done(dag_maker, 
caplog, session):
-    """Test that mini scheduler expands mapped task"""
-    with dag_maker() as dag:
-
-        @dag.task
-        def second_task():
-            return [0, 1, 2]
-
-        @dag.task
-        def first_task():
-            print(2)
-
-        @dag.task
-        def middle_task(id):
-            return id
-
-        middle = middle_task.expand(id=second_task())
-
-        @dag.task
-        def last_task():
-            print(3)
-
-        [first_task(), middle] >> last_task()
-
-    dr = dag_maker.create_dagrun()
-
-    first_ti = dr.get_task_instance(task_id="first_task")
-    first_ti.state = State.SUCCESS
-    session.merge(first_ti)
-    session.commit()
-    second_task = dag.get_task("second_task")
-    second_ti = dr.get_task_instance(task_id="second_task")
-    second_ti.refresh_from_task(second_task)
-    second_ti.run()
-    second_ti.schedule_downstream_tasks(session=session)
-    for i in range(3):
-        middle_ti = dr.get_task_instance(task_id="middle_task", map_index=i)
-        assert middle_ti.state == State.SCHEDULED
-    assert "3 downstream tasks scheduled from follow-on schedule" in 
caplog.text
-
-
[email protected]_if_database_isolation_mode
-def test_one_success_task_in_mini_scheduler_if_upstreams_are_done(dag_maker, 
caplog, session):
-    """Test that mini scheduler with one_success task"""
-    with dag_maker() as dag:
-        branch = BranchPythonOperator(task_id="branch", 
python_callable=lambda: "task_run")
-        task_run = BashOperator(task_id="task_run", bash_command="echo 0")
-        task_skip = BashOperator(task_id="task_skip", bash_command="echo 0")
-        task_1 = BashOperator(task_id="task_1", bash_command="echo 0")
-        task_one_success = BashOperator(
-            task_id="task_one_success", bash_command="echo 0", 
trigger_rule="one_success"
-        )
-        task_2 = BashOperator(task_id="task_2", bash_command="echo 0")
-
-        task_1 >> task_2
-        branch >> task_skip
-        branch >> task_run
-        task_run >> task_one_success
-        task_skip >> task_one_success
-        task_one_success >> task_2
-        task_skip >> task_2
-
-    dr = dag_maker.create_dagrun()
-
-    branch = dr.get_task_instance(task_id="branch")
-    task_1 = dr.get_task_instance(task_id="task_1")
-    task_skip = dr.get_task_instance(task_id="task_skip")
-    branch.state = State.SUCCESS
-    task_1.state = State.SUCCESS
-    task_skip.state = State.SKIPPED
-    session.merge(branch)
-    session.merge(task_1)
-    session.merge(task_skip)
-    session.commit()
-    task_1.refresh_from_task(dag.get_task("task_1"))
-    task_1.schedule_downstream_tasks(session=session)
-
-    branch = dr.get_task_instance(task_id="branch")
-    task_run = dr.get_task_instance(task_id="task_run")
-    task_skip = dr.get_task_instance(task_id="task_skip")
-    task_1 = dr.get_task_instance(task_id="task_1")
-    task_one_success = dr.get_task_instance(task_id="task_one_success")
-    task_2 = dr.get_task_instance(task_id="task_2")
-    assert branch.state == State.SUCCESS
-    assert task_run.state == State.NONE
-    assert task_skip.state == State.SKIPPED
-    assert task_1.state == State.SUCCESS
-    # task_one_success should not be scheduled
-    assert task_one_success.state == State.NONE
-    assert task_2.state == State.SKIPPED
-    assert "0 downstream tasks scheduled from follow-on schedule" in 
caplog.text
-
-    task_run = dr.get_task_instance(task_id="task_run")
-    task_run.state = State.SUCCESS
-    session.merge(task_run)
-    session.commit()
-    task_run.refresh_from_task(dag.get_task("task_run"))
-    task_run.schedule_downstream_tasks(session=session)
-
-    branch = dr.get_task_instance(task_id="branch")
-    task_run = dr.get_task_instance(task_id="task_run")
-    task_skip = dr.get_task_instance(task_id="task_skip")
-    task_1 = dr.get_task_instance(task_id="task_1")
-    task_one_success = dr.get_task_instance(task_id="task_one_success")
-    task_2 = dr.get_task_instance(task_id="task_2")
-    assert branch.state == State.SUCCESS
-    assert task_run.state == State.SUCCESS
-    assert task_skip.state == State.SKIPPED
-    assert task_1.state == State.SUCCESS
-    # task_one_success should not be scheduled
-    assert task_one_success.state == State.SCHEDULED
-    assert task_2.state == State.SKIPPED
-    assert "1 downstream tasks scheduled from follow-on schedule" in 
caplog.text
-
-
[email protected]_if_database_isolation_mode  # Does not work in db isolation 
mode
-def 
test_mini_scheduler_not_skip_mapped_downstream_until_all_upstreams_finish(dag_maker,
 session):
-    with dag_maker(session=session):
-
-        @task
-        def generate() -> list[list[int]]:
-            return []
-
-        @task
-        def a_sum(numbers: list[int]) -> int:
-            return sum(numbers)
-
-        @task
-        def b_double(summed: int) -> int:
-            return summed * 2
-
-        @task
-        def c_gather(result) -> None:
-            pass
-
-        static = EmptyOperator(task_id="static")
-
-        summed = a_sum.expand(numbers=generate())
-        doubled = b_double.expand(summed=summed)
-        static >> c_gather(doubled)
-
-    dr: DagRun = dag_maker.create_dagrun()
-    tis = {(ti.task_id, ti.map_index): ti for ti in dr.task_instances}
-
-    static_ti = tis[("static", -1)]
-    static_ti.run(session=session)
-    static_ti.schedule_downstream_tasks(session=session)
-    # No tasks should be skipped yet!
-    assert not dr.get_task_instances([TaskInstanceState.SKIPPED], 
session=session)
-
-    generate_ti = tis[("generate", -1)]
-    generate_ti.run(session=session)
-    generate_ti.schedule_downstream_tasks(session=session)
-    # Now downstreams can be skipped.
-    assert dr.get_task_instances([TaskInstanceState.SKIPPED], session=session)
-
-
 def test_taskinstance_with_note(create_task_instance, session):
     ti: TaskInstance = create_task_instance(session=session)
     ti.note = "ti with note"
@@ -5399,18 +5152,3 @@ def 
test__refresh_from_db_should_not_increment_try_number(dag_maker, session):
     assert ti.try_number == 1  # stays 1
     ti.refresh_from_db()
     assert ti.try_number == 1  # stays 1
-
-
[email protected]("airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks")
-def test_swallow_mini_scheduler_exceptions(_schedule_downstream_mock, 
create_task_instance, caplog):
-    _schedule_downstream_mock.side_effect = Exception("To be swallowed")
-    caplog.set_level(logging.ERROR)
-    ti = create_task_instance(
-        dag_id="dag_for_testing_swallowing_exception",
-        task_id="task_for_testing_swallowing_exception",
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-    )
-    ti.schedule_downstream_tasks()
-    assert "Error scheduling downstream tasks." in caplog.text
-    assert "To be swallowed" in caplog.text

Reply via email to