This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 70b8e50ddc Remove dag.run() method (#42761)
70b8e50ddc is described below

commit 70b8e50ddce305582802b6d08b1c948fa3ef086a
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Oct 10 14:39:33 2024 -0700

    Remove dag.run() method (#42761)
    
    This method uses Backfill internally. Before we can remove 
BackfillJobRunner, we need to remove DAG.run. But before we can remove DAG.run, 
we need to update some old tests that use it. So this is the first step towards 
removing BackflilJobRunner.
    
    There were some very old tests that came from airflow github issue 1225. 
These appeared to test the scheduler but really they tested the backfill job 
runner. Just to be cautious, I kept most of them rather than remove (which 
probably would have been fine since they essentially tested code that we'll be 
removing). As appropriate I either changed them to run on dag.test or 
scheduler. The ones dealing with ignore first depends on past will have to be 
added back when that functionality is [...]
---
 airflow/executors/executor_loader.py               |   2 +
 airflow/models/dag.py                              |  83 +--------
 .../contributors_quick_start_pycharm.rst           |  27 +--
 .../contributors_quick_start_vscode.rst            |   3 +-
 .../images/pycharm_add_configuration.png           | Bin 97805 -> 0 bytes
 .../images/pycharm_add_env_variable.png            | Bin 86158 -> 0 bytes
 contributing-docs/testing/dag_testing.rst          |  15 +-
 dev/tests_common/test_utils/system_tests.py        |   7 +-
 dev/tests_common/test_utils/system_tests_class.py  |  26 ---
 docs/apache-airflow/core-concepts/debug.rst        |  13 +-
 .../example_dags/example_display_video.py          |   4 +-
 .../google/cloud/operators/test_dataprep_system.py |   8 +-
 .../cloud/operators/test_datastore_system.py       |   4 +-
 .../transfers/test_facebook_ads_to_gcs_system.py   |   6 +-
 .../transfers/test_salesforce_to_gcs_system.py     |   7 +-
 .../operators/test_display_video_system.py         |  15 +-
 .../google/cloud/dataprep/example_dataprep.py      |   1 +
 tests/cli/commands/test_dag_command.py             |   2 +-
 tests/core/test_example_dags_system.py             | 118 ++++++++++++-
 .../test_future_start_date.py}                     |  29 +--
 tests/dags/test_issue_1225.py                      | 149 ----------------
 tests/jobs/test_backfill_job.py                    |  30 ----
 tests/jobs/test_scheduler_job.py                   | 195 ++++-----------------
 tests/models/test_dag.py                           |  19 --
 tests/models/test_xcom_arg.py                      |   7 +-
 25 files changed, 218 insertions(+), 552 deletions(-)

diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 1eeee1ff68..ec79860918 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -173,6 +173,8 @@ class ExecutorLoader:
 
         This is used in rare cases such as dag.run which allows, as a user 
convenience, to provide
         the executor by cli/argument instead of Airflow configuration
+
+        todo: given comments above, is this needed anymore since DAG.run is 
removed?
         """
         exec_class_name = executor.__class__.__qualname__
         exec_name = ExecutorName(f"{executor.__module__}.{exec_class_name}")
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2dc425daa0..5cc5cf4431 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -28,7 +28,6 @@ import pickle
 import sys
 import time
 import traceback
-import warnings
 import weakref
 from collections import abc, defaultdict, deque
 from contextlib import ExitStack
@@ -88,13 +87,11 @@ from airflow.exceptions import (
     DuplicateTaskIdFound,
     FailStopDagInvalidTriggerRule,
     ParamValidationError,
-    RemovedInAirflow3Warning,
     TaskDeferred,
     TaskNotFound,
     UnknownExecutorException,
 )
 from airflow.executors.executor_loader import ExecutorLoader
-from airflow.jobs.job import run_job
 from airflow.models.abstractoperator import AbstractOperator, 
TaskStateChangeCallback
 from airflow.models.asset import (
     AssetDagRunQueue,
@@ -2296,84 +2293,8 @@ class DAG(LoggingMixin):
 
         self.task_count = len(self.task_dict)
 
-    def run(
-        self,
-        start_date=None,
-        end_date=None,
-        mark_success=False,
-        local=False,
-        donot_pickle=airflow_conf.getboolean("core", "donot_pickle"),
-        ignore_task_deps=False,
-        ignore_first_depends_on_past=True,
-        pool=None,
-        delay_on_limit_secs=1.0,
-        verbose=False,
-        conf=None,
-        rerun_failed_tasks=False,
-        run_backwards=False,
-        run_at_least_once=False,
-        continue_on_failures=False,
-        disable_retry=False,
-    ):
-        """
-        Run the DAG.
-
-        :param start_date: the start date of the range to run
-        :param end_date: the end date of the range to run
-        :param mark_success: True to mark jobs as succeeded without running 
them
-        :param local: True to run the tasks using the LocalExecutor
-        :param donot_pickle: True to avoid pickling DAG object and send to 
workers
-        :param ignore_task_deps: True to skip upstream tasks
-        :param ignore_first_depends_on_past: True to ignore depends_on_past
-            dependencies for the first set of tasks only
-        :param pool: Resource pool to use
-        :param delay_on_limit_secs: Time in seconds to wait before next 
attempt to run
-            dag run when max_active_runs limit has been reached
-        :param verbose: Make logging output more verbose
-        :param conf: user defined dictionary passed from CLI
-        :param rerun_failed_tasks:
-        :param run_backwards:
-        :param run_at_least_once: If true, always run the DAG at least once 
even
-            if no logical run exists within the time range.
-        """
-        warnings.warn(
-            "`DAG.run()` is deprecated and will be removed in Airflow 3.0. 
Consider "
-            "using `DAG.test()` instead, or trigger your dag via API.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-
-        from airflow.executors.executor_loader import ExecutorLoader
-        from airflow.jobs.backfill_job_runner import BackfillJobRunner
-
-        if local:
-            from airflow.executors.local_executor import LocalExecutor
-
-            ExecutorLoader.set_default_executor(LocalExecutor())
-
-        from airflow.jobs.job import Job
-
-        job = Job()
-        job_runner = BackfillJobRunner(
-            job=job,
-            dag=self,
-            start_date=start_date,
-            end_date=end_date,
-            mark_success=mark_success,
-            donot_pickle=donot_pickle,
-            ignore_task_deps=ignore_task_deps,
-            ignore_first_depends_on_past=ignore_first_depends_on_past,
-            pool=pool,
-            delay_on_limit_secs=delay_on_limit_secs,
-            verbose=verbose,
-            conf=conf,
-            rerun_failed_tasks=rerun_failed_tasks,
-            run_backwards=run_backwards,
-            run_at_least_once=run_at_least_once,
-            continue_on_failures=continue_on_failures,
-            disable_retry=disable_retry,
-        )
-        run_job(job=job, execute_callable=job_runner._execute)
+    def run(self, *args, **kwargs):
+        """Leaving this here to be removed in other PR for simpler review."""
 
     def cli(self):
         """Exposes a CLI specific to this DAG."""
diff --git 
a/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst 
b/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst
index d830496b27..4a3319ae97 100644
--- a/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst
+++ b/contributing-docs/quick-start-ide/contributors_quick_start_pycharm.rst
@@ -78,35 +78,14 @@ It requires "airflow-env" virtual environment configured 
locally.
 
 - Copy any example DAG present in the ``/airflow/example_dags`` directory to 
``/files/dags/``.
 
-- Add a ``__main__`` block at the end of your DAG file to make it runnable. It 
will run a ``back_fill`` job:
+- Add a ``__main__`` block at the end of your DAG file to make it runnable:
 
   .. code-block:: python
 
     if __name__ == "__main__":
-        dag.clear()
-        dag.run()
+        dag.test()
 
-- Add ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` to Environment variable of Run 
Configuration.
-
-  - Click on Add configuration
-
-    .. raw:: html
-
-        <div align="center" style="padding-bottom:10px">
-          <img src="images/pycharm_add_configuration.png"
-               alt="Add Configuration pycharm">
-        </div>
-
-  - Add Script Path and Environment Variable to new Python configuration
-
-    .. raw:: html
-
-        <div align="center" style="padding-bottom:10px">
-          <img src="images/pycharm_add_env_variable.png"
-               alt="Add environment variable pycharm">
-        </div>
-
-- Now Debug an example dag and view the entries in tables such as ``dag_run, 
xcom`` etc in MySQL Workbench.
+- Run the file.
 
 Creating a branch
 #################
diff --git 
a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst 
b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
index 88ff1fdd84..61fdf50106 100644
--- a/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
+++ b/contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst
@@ -72,8 +72,7 @@ Setting up debugging
 
 
     if __name__ == "__main__":
-        dag.clear()
-        dag.run()
+        dag.test()
 
 - Add ``"AIRFLOW__CORE__EXECUTOR": "DebugExecutor"`` to the ``"env"`` field of 
Debug configuration.
 
diff --git 
a/contributing-docs/quick-start-ide/images/pycharm_add_configuration.png 
b/contributing-docs/quick-start-ide/images/pycharm_add_configuration.png
deleted file mode 100644
index 525b73e614..0000000000
Binary files 
a/contributing-docs/quick-start-ide/images/pycharm_add_configuration.png and 
/dev/null differ
diff --git 
a/contributing-docs/quick-start-ide/images/pycharm_add_env_variable.png 
b/contributing-docs/quick-start-ide/images/pycharm_add_env_variable.png
deleted file mode 100644
index f408372211..0000000000
Binary files 
a/contributing-docs/quick-start-ide/images/pycharm_add_env_variable.png and 
/dev/null differ
diff --git a/contributing-docs/testing/dag_testing.rst 
b/contributing-docs/testing/dag_testing.rst
index 7e311171ce..0bf506c2f3 100644
--- a/contributing-docs/testing/dag_testing.rst
+++ b/contributing-docs/testing/dag_testing.rst
@@ -20,31 +20,22 @@ DAG Testing
 ===========
 
 To ease and speed up the process of developing DAGs, you can use
-py:class:`~airflow.executors.debug_executor.DebugExecutor`, which is a single 
process executor
-for debugging purposes. Using this executor, you can run and debug DAGs from 
your IDE.
+py:meth:`~airflow.models.dag.DAG.test`, which will run a dag in a single 
process.
 
 To set up the IDE:
 
 1. Add ``main`` block at the end of your DAG file to make it runnable.
-It will run a backfill job:
 
 .. code-block:: python
 
   if __name__ == "__main__":
-      dag.clear()
-      dag.run()
+      dag.test()
 
 
-2. Set up ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in the run configuration 
of your IDE.
-   Make sure to also set up all environment variables required by your DAG.
-
 3. Run and debug the DAG file.
 
-Additionally, ``DebugExecutor`` can be used in a fail-fast mode that will make
-all other running or scheduled tasks fail immediately. To enable this option, 
set
-``AIRFLOW__DEBUG__FAIL_FAST=True`` or adjust ``fail_fast`` option in your 
``airflow.cfg``.
 
-Also, with the Airflow CLI command ``airflow dags test``, you can execute one 
complete run of a DAG:
+You can also run the dag in the same manner with the Airflow CLI command 
``airflow dags test``:
 
 .. code-block:: bash
 
diff --git a/dev/tests_common/test_utils/system_tests.py 
b/dev/tests_common/test_utils/system_tests.py
index 578ee6cc04..6558ae2d1e 100644
--- a/dev/tests_common/test_utils/system_tests.py
+++ b/dev/tests_common/test_utils/system_tests.py
@@ -30,7 +30,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def get_test_run(dag):
+def get_test_run(dag, **test_kwargs):
     def callback(context: Context):
         ti = context["dag_run"].get_task_instances()
         if not ti:
@@ -60,7 +60,10 @@ def get_test_run(dag):
         dag.on_success_callback = add_callback(dag.on_success_callback, 
callback)
         # If the env variable ``_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR`` is set, 
then use an executor to run the
         # DAG
-        dag_run = 
dag.test(use_executor=os.environ.get("_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR") == 
"1")
+        dag_run = dag.test(
+            use_executor=os.environ.get("_AIRFLOW__SYSTEM_TEST_USE_EXECUTOR") 
== "1",
+            **test_kwargs,
+        )
         assert (
             dag_run.state == DagRunState.SUCCESS
         ), "The system test failed, please look at the logs to find out the 
underlying failed task(s)"
diff --git a/dev/tests_common/test_utils/system_tests_class.py 
b/dev/tests_common/test_utils/system_tests_class.py
index 836782b858..5abdca96be 100644
--- a/dev/tests_common/test_utils/system_tests_class.py
+++ b/dev/tests_common/test_utils/system_tests_class.py
@@ -28,7 +28,6 @@ import pytest
 
 from airflow.configuration import AIRFLOW_HOME, AirflowConfigParser, 
get_airflow_config
 from airflow.exceptions import AirflowException
-from airflow.models.dagbag import DagBag
 
 from dev.tests_common.test_utils import AIRFLOW_MAIN_FOLDER
 from dev.tests_common.test_utils.logging_command_executor import get_executor
@@ -131,31 +130,6 @@ class SystemTest:
                     with open(filepath) as f:
                         print(f.read())
 
-    def run_dag(self, dag_id: str, dag_folder: str = DEFAULT_DAG_FOLDER) -> 
None:
-        """
-        Runs example dag by its ID.
-
-        :param dag_id: id of a DAG to be run
-        :param dag_folder: directory where to look for the specific DAG. 
Relative to AIRFLOW_HOME.
-        """
-        self.log.info("Looking for DAG: %s in %s", dag_id, dag_folder)
-        dag_bag = DagBag(dag_folder=dag_folder, include_examples=False)
-        dag = dag_bag.get_dag(dag_id)
-        if dag is None:
-            raise AirflowException(
-                f"The Dag {dag_id} could not be found. It's either an import 
problem, wrong dag_id or DAG is "
-                "not in provided dag_folder.The content of "
-                f"the {dag_folder} folder is {os.listdir(dag_folder)}"
-            )
-
-        self.log.info("Attempting to run DAG: %s", dag_id)
-        dag.clear()
-        try:
-            dag.run(ignore_first_depends_on_past=True, verbose=True)
-        except Exception:
-            self._print_all_log_files()
-            raise
-
     @staticmethod
     def create_dummy_file(filename, dir_path="/tmp"):
         os.makedirs(dir_path, exist_ok=True)
diff --git a/docs/apache-airflow/core-concepts/debug.rst 
b/docs/apache-airflow/core-concepts/debug.rst
index 9ab7819b8b..d58c490854 100644
--- a/docs/apache-airflow/core-concepts/debug.rst
+++ b/docs/apache-airflow/core-concepts/debug.rst
@@ -122,18 +122,9 @@ For more information on setting the configuration, see 
:doc:`../../howto/set-con
 
 1. Add ``main`` block at the end of your DAG file to make it runnable.
 
-It will run a backfill job:
-
 .. code-block:: python
 
   if __name__ == "__main__":
-      from airflow.utils.state import State
-
-      dag.clear()
-      dag.run()
-
-
-2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of 
your IDE. In
-   this step you should also setup all environment variables required by your 
DAG.
+      dag.test()
 
-3. Run / debug the DAG file.
+2. Run / debug the DAG file.
diff --git 
a/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
 
b/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
index 33abc67b63..3c008ee5ca 100644
--- 
a/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+++ 
b/providers/src/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
@@ -92,7 +92,7 @@ with DAG(
     "example_display_video_misc",
     start_date=START_DATE,
     catchup=False,
-) as dag2:
+) as dag_example_display_video_misc:
     # [START 
howto_google_display_video_upload_multiple_entity_read_files_to_big_query]
     upload_erf_to_bq = GCSToBigQueryOperator(
         task_id="upload_erf_to_bq",
@@ -125,7 +125,7 @@ with DAG(
     "example_display_video_sdf",
     start_date=START_DATE,
     catchup=False,
-) as dag3:
+) as dag_example_display_video_sdf:
     # [START howto_google_display_video_create_sdf_download_task_operator]
     create_sdf_download_task = 
GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
         task_id="create_sdf_download_task", 
body_request=CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST
diff --git a/providers/tests/google/cloud/operators/test_dataprep_system.py 
b/providers/tests/google/cloud/operators/test_dataprep_system.py
index 96f47fa3e3..dad77ac4ff 100644
--- a/providers/tests/google/cloud/operators/test_dataprep_system.py
+++ b/providers/tests/google/cloud/operators/test_dataprep_system.py
@@ -26,7 +26,8 @@ from airflow.models import Connection
 from airflow.utils.session import create_session
 
 from dev.tests_common.test_utils.db import clear_db_connections
-from dev.tests_common.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest
+from dev.tests_common.test_utils.gcp_system_helpers import GoogleSystemTest
+from dev.tests_common.test_utils.system_tests import get_test_run
 
 TOKEN = os.environ.get("DATAPREP_TOKEN")
 EXTRA = {"token": TOKEN}
@@ -52,4 +53,7 @@ class TestDataprepExampleDagsSystem(GoogleSystemTest):
         clear_db_connections()
 
     def test_run_example_dag(self):
-        self.run_dag(dag_id="example_dataprep", dag_folder=CLOUD_DAG_FOLDER)
+        from providers.tests.system.google.cloud.dataprep.example_dataprep 
import dag
+
+        run = get_test_run(dag)
+        run()
diff --git a/providers/tests/google/cloud/operators/test_datastore_system.py 
b/providers/tests/google/cloud/operators/test_datastore_system.py
index a98215a531..8807288358 100644
--- a/providers/tests/google/cloud/operators/test_datastore_system.py
+++ b/providers/tests/google/cloud/operators/test_datastore_system.py
@@ -44,8 +44,8 @@ class TestGcpDatastoreSystem(GoogleSystemTest):
 
     @provide_gcp_context(GCP_DATASTORE_KEY)
     def test_run_example_dag(self):
-        self.run_dag("example_gcp_datastore", CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_datastore", CLOUD_DAG_FOLDER)  # this dag 
does not exist?
 
     @provide_gcp_context(GCP_DATASTORE_KEY)
     def test_run_example_dag_operations(self):
-        self.run_dag("example_gcp_datastore_operations", CLOUD_DAG_FOLDER)
+        self.run_dag("example_gcp_datastore_operations", CLOUD_DAG_FOLDER)  # 
this dag does not exist?
diff --git 
a/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py 
b/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py
index ba24a0c34d..9cb0a9be52 100644
--- a/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py
+++ b/providers/tests/google/cloud/transfers/test_facebook_ads_to_gcs_system.py
@@ -25,13 +25,14 @@ import pytest
 
 from airflow.exceptions import AirflowException
 from airflow.models import Connection
+from airflow.providers.google.cloud.example_dags import 
example_facebook_ads_to_gcs
 from airflow.utils.process_utils import patch_environ
 
 from dev.tests_common.test_utils.gcp_system_helpers import (
-    CLOUD_DAG_FOLDER,
     GoogleSystemTest,
     provide_gcp_context,
 )
+from dev.tests_common.test_utils.system_tests import get_test_run
 from providers.tests.google.cloud.utils.gcp_authenticator import 
GCP_BIGQUERY_KEY
 
 CREDENTIALS_DIR = os.environ.get("CREDENTIALS_DIR", 
"/files/airflow-breeze-config/keys")
@@ -71,4 +72,5 @@ class TestFacebookAdsToGcsExampleDagsSystem(GoogleSystemTest):
     @provide_gcp_context(GCP_BIGQUERY_KEY)
     @provide_facebook_connection(FACEBOOK_CREDENTIALS_PATH)
     def test_dag_example(self):
-        self.run_dag("example_facebook_ads_to_gcs", CLOUD_DAG_FOLDER)
+        run = get_test_run(example_facebook_ads_to_gcs.dag)
+        run()
diff --git 
a/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py 
b/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py
index afd0856fad..d556f2d86e 100644
--- a/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py
+++ b/providers/tests/google/cloud/transfers/test_salesforce_to_gcs_system.py
@@ -20,12 +20,14 @@ import os
 
 import pytest
 
+from airflow.providers.google.cloud.example_dags import 
example_salesforce_to_gcs
+
 from dev.tests_common.test_utils.gcp_system_helpers import (
-    CLOUD_DAG_FOLDER,
     GoogleSystemTest,
     provide_gcp_context,
 )
 from dev.tests_common.test_utils.salesforce_system_helpers import 
provide_salesforce_connection
+from dev.tests_common.test_utils.system_tests import get_test_run
 from providers.tests.google.cloud.utils.gcp_authenticator import 
GCP_BIGQUERY_KEY
 
 CREDENTIALS_DIR = os.environ.get("CREDENTIALS_DIR", 
"/files/airflow-breeze-config/keys")
@@ -42,4 +44,5 @@ class TestSalesforceIntoGCSExample(GoogleSystemTest):
     @provide_gcp_context(GCP_BIGQUERY_KEY)
     @provide_salesforce_connection(SALESFORCE_CREDENTIALS_PATH)
     def test_run_example_dag_salesforce_to_gcs_operator(self):
-        self.run_dag("example_salesforce_to_gcs", CLOUD_DAG_FOLDER)
+        run = get_test_run(example_salesforce_to_gcs.dag)
+        run()
diff --git 
a/providers/tests/google/marketing_platform/operators/test_display_video_system.py
 
b/providers/tests/google/marketing_platform/operators/test_display_video_system.py
index 78f5d4ee02..49f44948ab 100644
--- 
a/providers/tests/google/marketing_platform/operators/test_display_video_system.py
+++ 
b/providers/tests/google/marketing_platform/operators/test_display_video_system.py
@@ -19,13 +19,18 @@ from __future__ import annotations
 import pytest
 
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
-from 
airflow.providers.google.marketing_platform.example_dags.example_display_video 
import BUCKET
+from 
airflow.providers.google.marketing_platform.example_dags.example_display_video 
import (
+    BUCKET,
+    dag_example_display_video_misc,
+    dag_example_display_video_sdf,
+)
 
 from dev.tests_common.test_utils.gcp_system_helpers import (
     MARKETING_DAG_FOLDER,
     GoogleSystemTest,
     provide_gcp_context,
 )
+from dev.tests_common.test_utils.system_tests import get_test_run
 from providers.tests.google.cloud.utils.gcp_authenticator import 
GCP_BIGQUERY_KEY, GMP_KEY
 
 # Requires the following scope:
@@ -50,12 +55,14 @@ class TestDisplayVideoSystem(GoogleSystemTest):
 
     @provide_gcp_context(GMP_KEY, scopes=SCOPES)
     def test_run_example_dag(self):
-        self.run_dag("example_display_video", MARKETING_DAG_FOLDER)
+        self.run_dag("example_display_video", MARKETING_DAG_FOLDER)  # this 
dag does not exist?
 
     @provide_gcp_context(GMP_KEY, scopes=SCOPES)
     def test_run_example_dag_misc(self):
-        self.run_dag("example_display_video_misc", MARKETING_DAG_FOLDER)
+        run = get_test_run(dag_example_display_video_misc)
+        run()
 
     @provide_gcp_context(GMP_KEY, scopes=SCOPES)
     def test_run_example_dag_sdf(self):
-        self.run_dag("example_display_video_sdf", MARKETING_DAG_FOLDER)
+        run = get_test_run(dag_example_display_video_sdf)
+        run()
diff --git a/providers/tests/system/google/cloud/dataprep/example_dataprep.py 
b/providers/tests/system/google/cloud/dataprep/example_dataprep.py
index 9f603f43fb..cdc736a41c 100644
--- a/providers/tests/system/google/cloud/dataprep/example_dataprep.py
+++ b/providers/tests/system/google/cloud/dataprep/example_dataprep.py
@@ -313,6 +313,7 @@ with models.DAG(
     # when "tearDown" task with trigger rule is part of the DAG
     list(dag.tasks) >> watcher()
 
+
 from dev.tests_common.test_utils.system_tests import get_test_run  # noqa: E402
 
 # Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 00364794f1..f0c9a18c1a 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -322,7 +322,7 @@ class TestCliDags:
         We just check we call dag.run() right. The behaviour of that kwarg is
         tested in test_jobs
         """
-        dag_id = "test_dagrun_states_deadlock"
+        dag_id = "example_bash_operator"
         run_date = DEFAULT_DATE + timedelta(days=1)
         args = [
             "dags",
diff --git a/tests/core/test_example_dags_system.py 
b/tests/core/test_example_dags_system.py
index c60b7325b1..bd34d9bb15 100644
--- a/tests/core/test_example_dags_system.py
+++ b/tests/core/test_example_dags_system.py
@@ -17,16 +17,128 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import timedelta
+
+import pendulum
 import pytest
+from sqlalchemy import select
+
+from airflow.models import DagRun
+from airflow.operators.python import PythonOperator
+from airflow.utils.module_loading import import_string
+from airflow.utils.state import DagRunState
+from airflow.utils.trigger_rule import TriggerRule
 
+from dev.tests_common.test_utils.system_tests import get_test_run
 from dev.tests_common.test_utils.system_tests_class import SystemTest
 
 
+def fail():
+    raise ValueError
+
+
+def get_dag_success(dag_maker):
+    with dag_maker(
+        dag_id="test_dagrun_states_success",
+        schedule=timedelta(days=1),
+    ) as dag:
+        dag4_task1 = PythonOperator(
+            task_id="test_dagrun_fail",
+            python_callable=fail,
+        )
+        dag4_task2 = PythonOperator(
+            task_id="test_dagrun_succeed", 
trigger_rule=TriggerRule.ALL_FAILED, python_callable=print
+        )
+        dag4_task2.set_upstream(dag4_task1)
+    return dag
+
+
+def get_dag_fail(dag_maker):
+    with dag_maker(
+        dag_id="test_dagrun_states_fail",
+        schedule=timedelta(days=1),
+    ) as dag:
+        dag3_task1 = PythonOperator(task_id="to_fail", python_callable=fail)
+        dag3_task2 = PythonOperator(task_id="to_succeed", 
python_callable=print)
+        dag3_task2.set_upstream(dag3_task1)
+    return dag
+
+
+def get_dag_fail_root(dag_maker):
+    with dag_maker(
+        dag_id="test_dagrun_states_root_fail",
+        schedule=timedelta(days=1),
+    ) as dag:
+        PythonOperator(task_id="test_dagrun_succeed", python_callable=print)
+        PythonOperator(
+            task_id="test_dagrun_fail",
+            python_callable=fail,
+        )
+    return dag
+
+
 @pytest.mark.system("core")
 class TestExampleDagsSystem(SystemTest):
     @pytest.mark.parametrize(
-        "dag_id",
+        "module",
         ["example_bash_operator", "example_branch_operator", "tutorial_dag", 
"example_dag_decorator"],
     )
-    def test_dag_example(self, dag_id):
-        self.run_dag(dag_id=dag_id)
+    def test_dag_example(self, module):
+        test_run = import_string(f"airflow.example_dags.{module}.test_run")
+        test_run()
+
+    @pytest.mark.parametrize(
+        "factory, expected",
+        [
+            (get_dag_fail, "failed"),
+            (get_dag_fail_root, "failed"),
+            (get_dag_success, "success"),
+        ],
+    )
+    def test_dag_run_final_state(self, factory, expected, dag_maker, session):
+        """
+        These tests are migrated tests that were added in PR #1289
+        which was fixing issue #1225.
+
+        I would be very surprised if these things were not covered elsewhere 
already
+        but, just in case, I'm migrating them to system tests.
+        """
+        dag = factory(dag_maker)
+        run = get_test_run(dag)
+        with pytest.raises(AssertionError, match="The system test failed"):
+            run()
+        dr = session.scalar(select(DagRun))
+        assert dr.state == "failed"
+
+    def test_dag_root_task_start_date_future(self, dag_maker, session):
+        """
+        These tests are migrated tests that were added in PR #1289
+        which was fixing issue #1225.
+
+        This one tests what happens when there's a dag with a root task with 
future start date.
+
+        The dag should run, but no TI should be created for the task where 
start date in future.
+        """
+        exec_date = pendulum.datetime(2021, 1, 1)
+        fut_start_date = pendulum.datetime(2021, 2, 1)
+        with dag_maker(
+            dag_id="dagrun_states_root_future",
+            schedule=timedelta(days=1),
+            catchup=False,
+        ) as dag:
+            PythonOperator(
+                task_id="current",
+                python_callable=lambda: print("hello"),
+            )
+            PythonOperator(
+                task_id="future",
+                python_callable=lambda: print("hello"),
+                start_date=fut_start_date,
+            )
+        run = get_test_run(dag, execution_date=exec_date)
+        run()
+        dr = session.scalar(select(DagRun))
+        tis = dr.task_instances
+        assert dr.state == DagRunState.SUCCESS
+        assert len(tis) == 1
+        assert tis[0].task_id == "current"
diff --git a/tests/core/test_example_dags_system.py 
b/tests/dags/test_future_start_date.py
similarity index 58%
copy from tests/core/test_example_dags_system.py
copy to tests/dags/test_future_start_date.py
index c60b7325b1..dadfbff600 100644
--- a/tests/core/test_example_dags_system.py
+++ b/tests/dags/test_future_start_date.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,18 +14,28 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from __future__ import annotations
 
-import pytest
+from datetime import timedelta
 
-from dev.tests_common.test_utils.system_tests_class import SystemTest
+import pendulum
 
+from airflow.models.dag import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.operators.python import PythonOperator
 
[email protected]("core")
-class TestExampleDagsSystem(SystemTest):
-    @pytest.mark.parametrize(
-        "dag_id",
-        ["example_bash_operator", "example_branch_operator", "tutorial_dag", 
"example_dag_decorator"],
+exec_date = pendulum.datetime(2021, 1, 1)
+fut_start_date = pendulum.datetime(2021, 2, 1)
+with DAG(
+    dag_id="test_dagrun_states_root_future",
+    schedule=timedelta(days=1),
+    catchup=True,
+    start_date=exec_date,
+) as dag:
+    EmptyOperator(task_id="current")
+    PythonOperator(
+        task_id="future",
+        python_callable=lambda: print("hello"),
+        start_date=fut_start_date,
     )
-    def test_dag_example(self, dag_id):
-        self.run_dag(dag_id=dag_id)
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
deleted file mode 100644
index 96a3ad1562..0000000000
--- a/tests/dags/test_issue_1225.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-DAG designed to test what happens when a DAG with pooled tasks is run
-by a BackfillJob.
-Addresses issue #1225.
-"""
-
-from __future__ import annotations
-
-from datetime import datetime, timedelta
-
-from airflow.models.dag import DAG
-from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import PythonOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-DEFAULT_DATE = datetime(2016, 1, 1)
-default_args = dict(start_date=DEFAULT_DATE, owner="airflow")
-
-
-def fail():
-    raise ValueError("Expected failure.")
-
-
-# DAG tests backfill with pooled tasks
-# Previously backfill would queue the task but never run it
-dag1 = DAG(
-    dag_id="test_backfill_pooled_task_dag",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag1_task1 = EmptyOperator(
-    task_id="test_backfill_pooled_task",
-    dag=dag1,
-    pool="test_backfill_pooled_task_pool",
-)
-
-# dag2 has been moved to test_prev_dagrun_dep.py
-
-# DAG tests that a Dag run that doesn't complete is marked failed
-dag3 = DAG(
-    dag_id="test_dagrun_states_fail",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag3_task1 = PythonOperator(task_id="test_dagrun_fail", dag=dag3, 
python_callable=fail)
-dag3_task2 = EmptyOperator(
-    task_id="test_dagrun_succeed",
-    dag=dag3,
-)
-dag3_task2.set_upstream(dag3_task1)
-
-# DAG tests that a Dag run that completes but has a failure is marked success
-dag4 = DAG(
-    dag_id="test_dagrun_states_success",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag4_task1 = PythonOperator(
-    task_id="test_dagrun_fail",
-    dag=dag4,
-    python_callable=fail,
-)
-dag4_task2 = EmptyOperator(task_id="test_dagrun_succeed", dag=dag4, 
trigger_rule=TriggerRule.ALL_FAILED)
-dag4_task2.set_upstream(dag4_task1)
-
-# DAG tests that a Dag run that completes but has a root failure is marked fail
-dag5 = DAG(
-    dag_id="test_dagrun_states_root_fail",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag5_task1 = EmptyOperator(
-    task_id="test_dagrun_succeed",
-    dag=dag5,
-)
-dag5_task2 = PythonOperator(
-    task_id="test_dagrun_fail",
-    dag=dag5,
-    python_callable=fail,
-)
-
-# DAG tests that a Dag run that is deadlocked with no states is failed
-dag6 = DAG(
-    dag_id="test_dagrun_states_deadlock",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag6_task1 = EmptyOperator(
-    task_id="test_depends_on_past",
-    depends_on_past=True,
-    dag=dag6,
-)
-dag6_task2 = EmptyOperator(
-    task_id="test_depends_on_past_2",
-    depends_on_past=True,
-    dag=dag6,
-)
-dag6_task2.set_upstream(dag6_task1)
-
-
-# DAG tests that a Dag run that doesn't complete but has a root failure is 
marked running
-dag8 = DAG(
-    dag_id="test_dagrun_states_root_fail_unfinished",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag8_task1 = EmptyOperator(
-    task_id="test_dagrun_unfinished",  # The test will unset the task instance 
state after
-    # running this test
-    dag=dag8,
-)
-dag8_task2 = PythonOperator(
-    task_id="test_dagrun_fail",
-    dag=dag8,
-    python_callable=fail,
-)
-
-# DAG tests that a Dag run that completes but has a root in the future is 
marked as success
-dag9 = DAG(
-    dag_id="test_dagrun_states_root_future",
-    schedule=timedelta(days=1),
-    default_args=default_args,
-)
-dag9_task1 = EmptyOperator(
-    task_id="current",
-    dag=dag9,
-)
-dag9_task2 = EmptyOperator(
-    task_id="future",
-    dag=dag9,
-    start_date=DEFAULT_DATE + timedelta(days=1),
-)
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index f1bb6f17d0..dead9be862 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -32,7 +32,6 @@ from airflow import settings
 from airflow.cli import cli_parser
 from airflow.exceptions import (
     AirflowException,
-    AirflowTaskTimeout,
     BackfillUnfinished,
     DagConcurrencyLimitReached,
     NoAvailablePoolSlot,
@@ -54,7 +53,6 @@ from airflow.operators.empty import EmptyOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
-from airflow.utils.timeout import timeout
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import DagRunType
 from tests.listeners import dag_listener
@@ -1070,34 +1068,6 @@ class TestBackfillJob:
             ],
         ]
 
-    def test_backfill_pooled_tasks(self):
-        """
-        Test that queued tasks are executed by BackfillJobRunner
-        """
-        session = settings.Session()
-        pool = Pool(pool="test_backfill_pooled_task_pool", slots=1, 
include_deferred=False)
-        session.add(pool)
-        session.commit()
-        session.close()
-
-        dag = self.dagbag.get_dag("test_backfill_pooled_task_dag")
-        dag.clear()
-
-        job = Job()
-        job_runner = BackfillJobRunner(job=job, dag=dag, 
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-        # run with timeout because this creates an infinite loop if not
-        # caught
-        try:
-            with timeout(seconds=20):
-                run_job(job=job, execute_callable=job_runner._execute)
-        except AirflowTaskTimeout:
-            logger.info("Timeout while waiting for task to complete")
-        run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
-        ti = TI(task=dag.get_task("test_backfill_pooled_task"), run_id=run_id)
-        ti.refresh_from_db()
-        assert ti.state == State.SUCCESS
-
     @pytest.mark.parametrize("ignore_depends_on_past", [True, False])
     def 
test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past(
         self, ignore_depends_on_past, mock_executor
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a5748ebaee..639a3528fa 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -43,7 +43,8 @@ from airflow.callbacks.callback_requests import 
DagCallbackRequest, TaskCallback
 from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
 from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
 from airflow.dag_processing.manager import DagFileProcessorAgent
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
+from airflow.decorators import task
+from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.executor_constants import MOCK_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
@@ -2828,128 +2829,33 @@ class TestSchedulerJob:
 
         assert [] == res
 
-    @provide_session
-    def evaluate_dagrun(
-        self,
-        dag_id,
-        expected_task_states,  # dict of task_id: state
-        dagrun_state,
-        run_kwargs=None,
-        advance_execution_date=False,
-        session=None,
-    ):
-        """
-        Helper for testing DagRun states with simple two-task DAGs.
-        This is hackish: a dag run is created but its tasks are
-        run by a backfill.
-        """
-
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-
-        if run_kwargs is None:
-            run_kwargs = {}
-
-        dag = self.dagbag.get_dag(dag_id)
-        dagrun_info = dag.next_dagrun_info(None)
-        assert dagrun_info is not None
-        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            execution_date=dagrun_info.logical_date,
-            state=None,
-            session=session,
-            data_interval=data_interval,
-            **triggered_by_kwargs,
-        )
-
-        if advance_execution_date:
-            # run a second time to schedule a dagrun after the start_date
-            dr = dag.create_dagrun(
-                run_type=DagRunType.SCHEDULED,
-                execution_date=dr.data_interval_end,
-                state=None,
-                session=session,
-                data_interval=data_interval,
-                **triggered_by_kwargs,
-            )
-        ex_date = dr.execution_date
-
-        for tid, state in expected_task_states.items():
-            if state == State.FAILED:
-                self.null_exec.mock_task_fail(dag_id, tid, dr.run_id)
-
-        try:
-            dag = DagBag().get_dag(dag.dag_id)
-            # This needs a _REAL_ dag, not the serialized version
-            assert not isinstance(dag, SerializedDAG)
-            # TODO: Can this be replaced with 
`self.run_scheduler_until_dagrun_terminal. `dag.run` isn't
-            # great to use here as it uses BackfillJobRunner!
-            for _ in _mock_executor(self.null_exec):
-                dag.run(start_date=ex_date, end_date=ex_date, **run_kwargs)
-        except AirflowException:
-            pass
-
-        # load dagrun
-        dr = DagRun.find(dag_id=dag_id, execution_date=ex_date, 
session=session)
-        dr = dr[0]
-        dr.dag = dag
-
-        assert dr.state == dagrun_state
-
-        # test tasks
-        for task_id, expected_state in expected_task_states.items():
-            ti = dr.get_task_instance(task_id)
-            assert ti.state == expected_state
-
-    def test_dagrun_fail(self):
+    @pytest.mark.parametrize(
+        "ti_states, run_state",
+        [
+            (["failed", "success"], "failed"),
+            (["success", "success"], "success"),
+        ],
+    )
+    def test_dagrun_state_correct(self, ti_states, run_state, dag_maker, 
session):
         """
         DagRuns with one failed and one incomplete root task -> FAILED
         """
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            self.evaluate_dagrun(
-                dag_id="test_dagrun_states_fail",
-                expected_task_states={
-                    "test_dagrun_fail": State.FAILED,
-                    "test_dagrun_succeed": State.UPSTREAM_FAILED,
-                },
-                dagrun_state=State.FAILED,
-            )
+        with dag_maker():
 
-    def test_dagrun_success(self):
-        """
-        DagRuns with one failed and one successful root task -> SUCCESS
-        """
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            self.evaluate_dagrun(
-                dag_id="test_dagrun_states_success",
-                expected_task_states={
-                    "test_dagrun_fail": State.FAILED,
-                    "test_dagrun_succeed": State.SUCCESS,
-                },
-                dagrun_state=State.SUCCESS,
-            )
+            @task
+            def my_task(): ...
 
-    def test_dagrun_root_fail(self):
-        """
-        DagRuns with one successful and one failed root task -> FAILED
-        """
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            self.evaluate_dagrun(
-                dag_id="test_dagrun_states_root_fail",
-                expected_task_states={
-                    "test_dagrun_succeed": State.SUCCESS,
-                    "test_dagrun_fail": State.FAILED,
-                },
-                dagrun_state=State.FAILED,
-            )
+            for _ in ti_states:
+                my_task()
+        dr = dag_maker.create_dagrun(state="running", 
triggered_by=DagRunTriggeredByType.TIMETABLE)
+        for idx, state in enumerate(ti_states):
+            dr.task_instances[idx].state = state
+        session.commit()
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+        self.job_runner.processor_agent = mock.MagicMock()
+        self.job_runner._do_scheduling(session)
+        assert session.query(DagRun).one().state == run_state
 
     def test_dagrun_root_after_dagrun_unfinished(self, mock_executor):
         """
@@ -2963,57 +2869,15 @@ class TestSchedulerJob:
         dag.sync_to_db()
 
         scheduler_job = Job()
-        self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1, 
subdir=dag.fileloc)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=2, 
subdir=dag.fileloc)
         run_job(scheduler_job, execute_callable=self.job_runner._execute)
 
-        first_run = DagRun.find(dag_id=dag_id, execution_date=DEFAULT_DATE)[0]
+        first_run = DagRun.find(dag_id=dag_id)[0]
         ti_ids = [(ti.task_id, ti.state) for ti in 
first_run.get_task_instances()]
 
         assert ti_ids == [("current", State.SUCCESS)]
         assert first_run.state in [State.SUCCESS, State.RUNNING]
 
-    def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
-        """
-        DagRun is marked a success if ignore_first_depends_on_past=True
-
-        Test that an otherwise-deadlocked dagrun is marked as a success
-        if ignore_first_depends_on_past=True and the dagrun execution_date
-        is after the start_date.
-        """
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            self.evaluate_dagrun(
-                dag_id="test_dagrun_states_deadlock",
-                expected_task_states={
-                    "test_depends_on_past": State.SUCCESS,
-                    "test_depends_on_past_2": State.SUCCESS,
-                },
-                dagrun_state=State.SUCCESS,
-                advance_execution_date=True,
-                run_kwargs=dict(ignore_first_depends_on_past=True),
-            )
-
-    def test_dagrun_deadlock_ignore_depends_on_past(self):
-        """
-        Test that ignore_first_depends_on_past doesn't affect results
-        (this is the same test as
-        test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except
-        that start_date == execution_date so depends_on_past is irrelevant).
-        """
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            self.evaluate_dagrun(
-                dag_id="test_dagrun_states_deadlock",
-                expected_task_states={
-                    "test_depends_on_past": State.SUCCESS,
-                    "test_depends_on_past_2": State.SUCCESS,
-                },
-                dagrun_state=State.SUCCESS,
-                run_kwargs=dict(ignore_first_depends_on_past=True),
-            )
-
     @pytest.mark.parametrize(
         "configs",
         [
@@ -3136,9 +3000,14 @@ class TestSchedulerJob:
         Test that the scheduler can successfully queue multiple dags in 
parallel
         """
         with conf_vars(configs):
-            dag_ids = ["test_start_date_scheduling", 
"test_dagrun_states_success"]
+            dag_ids = [
+                "test_start_date_scheduling",
+                "test_task_start_date_scheduling",
+            ]
             for dag_id in dag_ids:
                 dag = self.dagbag.get_dag(dag_id)
+                if not dag:
+                    raise ValueError(f"could not find dag {dag_id}")
                 dag.clear()
 
             scheduler_job = Job(
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 997ef06329..67dc699fc3 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -25,7 +25,6 @@ import pickle
 import re
 import weakref
 from datetime import timedelta
-from importlib import reload
 from pathlib import Path
 from typing import TYPE_CHECKING
 from unittest import mock
@@ -45,12 +44,8 @@ from airflow.exceptions import (
     AirflowException,
     DuplicateTaskIdFound,
     ParamValidationError,
-    RemovedInAirflow3Warning,
     UnknownExecutorException,
 )
-from airflow.executors import executor_loader
-from airflow.executors.local_executor import LocalExecutor
-from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models.asset import (
     AssetAliasModel,
     AssetDagRunQueue,
@@ -2740,20 +2735,6 @@ class TestDagModel:
             ]
         }
 
-    @mock.patch("airflow.models.dag.run_job")
-    def test_dag_executors(self, run_job_mock):
-        # todo: AIP-78 remove along with DAG.run()
-        #  this only tests the backfill job runner, not the scheduler
-        with pytest.warns(RemovedInAirflow3Warning):
-            dag = DAG(dag_id="test", schedule=None)
-            reload(executor_loader)
-            with conf_vars({("core", "executor"): "SequentialExecutor"}):
-                dag.run()
-                assert 
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, 
SequentialExecutor)
-
-                dag.run(local=True)
-                assert 
isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)
-
 
 class TestQueries:
     def setup_method(self) -> None:
diff --git a/tests/models/test_xcom_arg.py b/tests/models/test_xcom_arg.py
index b161020d1f..fbdd500661 100644
--- a/tests/models/test_xcom_arg.py
+++ b/tests/models/test_xcom_arg.py
@@ -23,7 +23,6 @@ from airflow.operators.python import PythonOperator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.utils.types import NOTSET
 
-from dev.tests_common.test_utils.config import conf_vars
 from dev.tests_common.test_utils.db import clear_db_dags, clear_db_runs
 
 pytestmark = pytest.mark.db_test
@@ -146,7 +145,6 @@ class TestXComArgBuild:
 
 @pytest.mark.system("core")
 class TestXComArgRuntime:
-    @conf_vars({("core", "executor"): "DebugExecutor"})
     def test_xcom_pass_to_op(self, dag_maker):
         with dag_maker(dag_id="test_xcom_pass_to_op") as dag:
             operator = PythonOperator(
@@ -161,9 +159,8 @@ class TestXComArgRuntime:
                 task_id="assert_is_value_1",
             )
             operator >> operator2
-        dag.run()
+        dag.test()
 
-    @conf_vars({("core", "executor"): "DebugExecutor"})
     def test_xcom_push_and_pass(self, dag_maker):
         def push_xcom_value(key, value, **context):
             ti = context["task_instance"]
@@ -182,7 +179,7 @@ class TestXComArgRuntime:
                 op_args=[xarg],
             )
             op1 >> op2
-        dag.run()
+        dag.test()
 
 
 @pytest.mark.skip_if_database_isolation_mode  # Does not work in db isolation 
mode

Reply via email to