This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c75785fac Fix import sequencing of backfill and dagrun models (#42828)
6c75785fac is described below

commit 6c75785fac20a98651d92e8278b7052f5545ee60
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Oct 8 08:34:43 2024 -0700

    Fix import sequencing of backfill and dagrun models (#42828)
    
    Previously it worked in general, but did not in certain test scenarios 
where we did not initialize the database.  This is one solution.
---
 airflow/models/backfill.py | 6 ++++--
 airflow/models/dagrun.py   | 3 ++-
 tests/cli/conftest.py      | 4 ++--
 tests/conftest.py          | 6 ------
 4 files changed, 8 insertions(+), 11 deletions(-)

diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index e8c31015c2..37683ee6f1 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -32,9 +32,7 @@ from sqlalchemy_jsonfield import JSONField
 
 from airflow.api_connexion.exceptions import Conflict, NotFound
 from airflow.exceptions import AirflowException
-from airflow.models import DagRun
 from airflow.models.base import Base, StringID
-from airflow.models.serialized_dag import SerializedDagModel
 from airflow.settings import json
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -129,6 +127,8 @@ def _create_backfill(
     reverse: bool,
     dag_run_conf: dict | None,
 ) -> Backfill | None:
+    from airflow.models.serialized_dag import SerializedDagModel
+
     with create_session() as session:
         serdag = session.get(SerializedDagModel, dag_id)
         if not serdag:
@@ -215,6 +215,8 @@ def _cancel_backfill(backfill_id) -> Backfill:
 
         session.commit()
 
+        from airflow.models import DagRun
+
         # now, let's mark all queued dag runs as failed
         query = (
             update(DagRun)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 3abf16b7f9..0373bc667b 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -57,6 +57,7 @@ from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import Log
 from airflow.models.abstractoperator import NotMapped
+from airflow.models.backfill import Backfill
 from airflow.models.base import Base, StringID
 from airflow.models.expandinput import NotFullyPopulated
 from airflow.models.taskinstance import TaskInstance as TI
@@ -207,7 +208,7 @@ class DagRun(Base, LoggingMixin):
         uselist=False,
         cascade="all, delete, delete-orphan",
     )
-    backfill = relationship("Backfill", uselist=False)
+    backfill = relationship(Backfill, uselist=False)
     backfill_max_active_runs = association_proxy("backfill", "max_active_runs")
     max_active_runs = association_proxy("dag_model", "max_active_runs")
 
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index 9987afb683..9f0a63af06 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -21,9 +21,9 @@ import sys
 
 import pytest
 
-from airflow import models
 from airflow.cli import cli_parser
 from airflow.executors import local_executor
+from airflow.models.dagbag import DagBag
 from airflow.providers.celery.executors import celery_executor, 
celery_kubernetes_executor
 from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, 
local_kubernetes_executor
 from tests.test_utils.config import conf_vars
@@ -56,7 +56,7 @@ def load_examples():
 
 @pytest.fixture(scope="session")
 def dagbag():
-    return models.DagBag(include_examples=True)
+    return DagBag(include_examples=True)
 
 
 @pytest.fixture(scope="session")
diff --git a/tests/conftest.py b/tests/conftest.py
index d0a34a6e00..60d009416f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -412,12 +412,6 @@ def initialize_airflow_tests(request):
                 "Skipping initializing of the DB as it was initialized 
already.\n"
                 "You can re-initialize the database by adding --with-db-init 
flag when running tests."
             )
-    else:
-        # if we are not initializing the database (due to skip db tests)
-        # we need to ensure Backfill is defined before DagRun
-        # otherwise we get this error:
-        # "sqlalchemy.exc.InvalidRequestError: When initializing mapper mapped 
class..."
-        from airflow.models.backfill import Backfill  # noqa: F401
     integration_kerberos = os.environ.get("INTEGRATION_KERBEROS")
     if integration_kerberos == "true":
         # Initialize kerberos

Reply via email to