This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c75785fac Fix import sequencing of backfill and dagrun models (#42828)
6c75785fac is described below
commit 6c75785fac20a98651d92e8278b7052f5545ee60
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Oct 8 08:34:43 2024 -0700
Fix import sequencing of backfill and dagrun models (#42828)
Previously it worked in general, but did not in certain test scenarios
where we did not initialize the database. This is one solution.
---
airflow/models/backfill.py | 6 ++++--
airflow/models/dagrun.py | 3 ++-
tests/cli/conftest.py | 4 ++--
tests/conftest.py | 6 ------
4 files changed, 8 insertions(+), 11 deletions(-)
diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py
index e8c31015c2..37683ee6f1 100644
--- a/airflow/models/backfill.py
+++ b/airflow/models/backfill.py
@@ -32,9 +32,7 @@ from sqlalchemy_jsonfield import JSONField
from airflow.api_connexion.exceptions import Conflict, NotFound
from airflow.exceptions import AirflowException
-from airflow.models import DagRun
from airflow.models.base import Base, StringID
-from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
@@ -129,6 +127,8 @@ def _create_backfill(
reverse: bool,
dag_run_conf: dict | None,
) -> Backfill | None:
+ from airflow.models.serialized_dag import SerializedDagModel
+
with create_session() as session:
serdag = session.get(SerializedDagModel, dag_id)
if not serdag:
@@ -215,6 +215,8 @@ def _cancel_backfill(backfill_id) -> Backfill:
session.commit()
+ from airflow.models import DagRun
+
# now, let's mark all queued dag runs as failed
query = (
update(DagRun)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 3abf16b7f9..0373bc667b 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -57,6 +57,7 @@ from airflow.exceptions import AirflowException, TaskNotFound
from airflow.listeners.listener import get_listener_manager
from airflow.models import Log
from airflow.models.abstractoperator import NotMapped
+from airflow.models.backfill import Backfill
from airflow.models.base import Base, StringID
from airflow.models.expandinput import NotFullyPopulated
from airflow.models.taskinstance import TaskInstance as TI
@@ -207,7 +208,7 @@ class DagRun(Base, LoggingMixin):
uselist=False,
cascade="all, delete, delete-orphan",
)
- backfill = relationship("Backfill", uselist=False)
+ backfill = relationship(Backfill, uselist=False)
backfill_max_active_runs = association_proxy("backfill", "max_active_runs")
max_active_runs = association_proxy("dag_model", "max_active_runs")
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index 9987afb683..9f0a63af06 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -21,9 +21,9 @@ import sys
import pytest
-from airflow import models
from airflow.cli import cli_parser
from airflow.executors import local_executor
+from airflow.models.dagbag import DagBag
from airflow.providers.celery.executors import celery_executor,
celery_kubernetes_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor,
local_kubernetes_executor
from tests.test_utils.config import conf_vars
@@ -56,7 +56,7 @@ def load_examples():
@pytest.fixture(scope="session")
def dagbag():
- return models.DagBag(include_examples=True)
+ return DagBag(include_examples=True)
@pytest.fixture(scope="session")
diff --git a/tests/conftest.py b/tests/conftest.py
index d0a34a6e00..60d009416f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -412,12 +412,6 @@ def initialize_airflow_tests(request):
"Skipping initializing of the DB as it was initialized
already.\n"
"You can re-initialize the database by adding --with-db-init
flag when running tests."
)
- else:
- # if we are not initializing the database (due to skip db tests)
- # we need to ensure Backfill is defined before DagRun
- # otherwise we get this error:
- # "sqlalchemy.exc.InvalidRequestError: When initializing mapper mapped
class..."
- from airflow.models.backfill import Backfill # noqa: F401
integration_kerberos = os.environ.get("INTEGRATION_KERBEROS")
if integration_kerberos == "true":
# Initialize kerberos