This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a6f612d899 Rename model `ImportError` to `ParseImportError` for avoid
shadowing with builtin exception (#39116)
a6f612d899 is described below
commit a6f612d89942f141eb8a7affbbea46d033923d1a
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Apr 19 12:51:21 2024 +0400
Rename model `ImportError` to `ParseImportError` for avoid shadowing with
builtin exception (#39116)
---
airflow/api/common/delete_dag.py | 5 ++--
.../endpoints/import_error_endpoint.py | 12 ++++----
airflow/api_connexion/schemas/error_schema.py | 6 ++--
airflow/dag_processing/manager.py | 8 +++---
airflow/dag_processing/processor.py | 13 +++++----
airflow/models/__init__.py | 30 ++++++++++++++------
airflow/models/errors.py | 19 ++++++++++++-
airflow/www/utils.py | 4 +--
airflow/www/views.py | 7 +++--
pyproject.toml | 2 ++
.../endpoints/test_import_error_endpoint.py | 30 ++++++++++----------
tests/api_connexion/schemas/test_error_schema.py | 8 +++---
tests/api_experimental/common/test_delete_dag.py | 2 +-
tests/dag_processing/test_job_runner.py | 11 ++++----
tests/dag_processing/test_processor.py | 33 +++++++++++-----------
tests/test_utils/db.py | 4 +--
16 files changed, 116 insertions(+), 78 deletions(-)
diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index 4452e2726f..1cf7ffec8b 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -27,6 +27,7 @@ from sqlalchemy import and_, delete, or_, select
from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DagModel, TaskFail
+from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.db import get_sqla_model_classes
from airflow.utils.session import NEW_SESSION, provide_session
@@ -99,8 +100,8 @@ def delete_dag(dag_id: str, keep_records_in_log: bool =
True, session: Session =
# Delete entries in Import Errors table for a deleted DAG
# This handles the case when the dag_id is changed in the file
session.execute(
- delete(models.ImportError)
- .where(models.ImportError.filename == dag.fileloc)
+ delete(ParseImportError)
+ .where(ParseImportError.filename == dag.fileloc)
.execution_options(synchronize_session="fetch")
)
diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py
b/airflow/api_connexion/endpoints/import_error_endpoint.py
index 274d842d18..76b706eac1 100644
--- a/airflow/api_connexion/endpoints/import_error_endpoint.py
+++ b/airflow/api_connexion/endpoints/import_error_endpoint.py
@@ -30,7 +30,7 @@ from airflow.api_connexion.schemas.error_schema import (
)
from airflow.auth.managers.models.resource_details import AccessView,
DagDetails
from airflow.models.dag import DagModel
-from airflow.models.errors import ImportError as ImportErrorModel
+from airflow.models.errors import ParseImportError
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager
@@ -45,7 +45,7 @@ if TYPE_CHECKING:
@provide_session
def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION)
-> APIResponse:
"""Get an import error."""
- error = session.get(ImportErrorModel, import_error_id)
+ error = session.get(ParseImportError, import_error_id)
if error is None:
raise NotFound(
"Import error not found",
@@ -85,8 +85,8 @@ def get_import_errors(
"""Get all import errors."""
to_replace = {"import_error_id": "id"}
allowed_sort_attrs = ["import_error_id", "timestamp", "filename"]
- count_query = select(func.count(ImportErrorModel.id))
- query = select(ImportErrorModel)
+ count_query = select(func.count(ParseImportError.id))
+ query = select(ParseImportError)
query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs)
can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET")
@@ -95,8 +95,8 @@ def get_import_errors(
# if the user doesn't have access to all DAGs, only display errors
from visible DAGs
readable_dag_ids = security.get_readable_dags()
dagfiles_stmt =
select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids))
- query = query.where(ImportErrorModel.filename.in_(dagfiles_stmt))
- count_query =
count_query.where(ImportErrorModel.filename.in_(dagfiles_stmt))
+ query = query.where(ParseImportError.filename.in_(dagfiles_stmt))
+ count_query =
count_query.where(ParseImportError.filename.in_(dagfiles_stmt))
total_entries = session.scalars(count_query).one()
import_errors = session.scalars(query.offset(offset).limit(limit)).all()
diff --git a/airflow/api_connexion/schemas/error_schema.py
b/airflow/api_connexion/schemas/error_schema.py
index 97e12c584e..8f117fb966 100644
--- a/airflow/api_connexion/schemas/error_schema.py
+++ b/airflow/api_connexion/schemas/error_schema.py
@@ -21,7 +21,7 @@ from typing import NamedTuple
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
-from airflow.models.errors import ImportError
+from airflow.models.errors import ParseImportError
class ImportErrorSchema(SQLAlchemySchema):
@@ -30,7 +30,7 @@ class ImportErrorSchema(SQLAlchemySchema):
class Meta:
"""Meta."""
- model = ImportError
+ model = ParseImportError
import_error_id = auto_field("id", dump_only=True)
timestamp = auto_field(format="iso", dump_only=True)
@@ -41,7 +41,7 @@ class ImportErrorSchema(SQLAlchemySchema):
class ImportErrorCollection(NamedTuple):
"""List of import errors with metadata."""
- import_errors: list[ImportError]
+ import_errors: list[ParseImportError]
total_entries: int
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index ef9ac5c44a..b2931feb5d 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -45,10 +45,10 @@ from airflow.api_internal.internal_api_call import
internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest,
SlaCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
-from airflow.models import errors
from airflow.models.dag import DagModel
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
+from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
@@ -803,12 +803,12 @@ class DagFileProcessorManager(LoggingMixin):
:param file_paths: list of paths to DAG definition files
:param session: session for ORM operations
"""
- query = delete(errors.ImportError)
+ query = delete(ParseImportError)
if file_paths:
query = query.where(
- ~errors.ImportError.filename.in_(file_paths),
- errors.ImportError.processor_subdir == processor_subdir,
+ ~ParseImportError.filename.in_(file_paths),
+ ParseImportError.processor_subdir == processor_subdir,
)
session.execute(query.execution_options(synchronize_session="fetch"))
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 99c9656826..f813a1beb2 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -39,11 +39,12 @@ from airflow.callbacks.callback_requests import (
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
-from airflow.models import SlaMiss, errors
+from airflow.models import SlaMiss
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
+from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI
from airflow.stats import Stats
@@ -613,24 +614,24 @@ class DagFileProcessor(LoggingMixin):
# that no longer have errors
for dagbag_file in files_without_error:
session.execute(
- delete(errors.ImportError)
- .where(errors.ImportError.filename.startswith(dagbag_file))
+ delete(ParseImportError)
+ .where(ParseImportError.filename.startswith(dagbag_file))
.execution_options(synchronize_session="fetch")
)
# files that still have errors
- existing_import_error_files = [x.filename for x in
session.query(errors.ImportError.filename).all()]
+ existing_import_error_files = [x.filename for x in
session.query(ParseImportError.filename).all()]
# Add the errors of the processed files
for filename, stacktrace in import_errors.items():
if filename in existing_import_error_files:
-
session.query(errors.ImportError).filter(errors.ImportError.filename ==
filename).update(
+
session.query(ParseImportError).filter(ParseImportError.filename ==
filename).update(
{"filename": filename, "timestamp": timezone.utcnow(),
"stacktrace": stacktrace},
synchronize_session="fetch",
)
else:
session.add(
- errors.ImportError(
+ ParseImportError(
filename=filename,
timestamp=timezone.utcnow(),
stacktrace=stacktrace,
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index c5462c57f8..68bc4d8c14 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -34,7 +34,6 @@ __all__ = [
"DagRun",
"DagTag",
"DbCallbackRequest",
- "ImportError",
"Log",
"MappedOperator",
"Operator",
@@ -62,19 +61,36 @@ def import_all_models():
import airflow.models.dagwarning
import airflow.models.dataset
+ import airflow.models.errors
import airflow.models.serialized_dag
import airflow.models.tasklog
def __getattr__(name):
# PEP-562: Lazy loaded attributes on python modules
- path = __lazy_imports.get(name)
- if not path:
- raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
+ if name != "ImportError":
+ path = __lazy_imports.get(name)
+ if not path:
+ raise AttributeError(f"module {__name__!r} has no attribute
{name!r}")
- from airflow.utils.module_loading import import_string
+ from airflow.utils.module_loading import import_string
+
+ val = import_string(f"{path}.{name}")
+ else:
+ import warnings
+
+ from airflow.exceptions import RemovedInAirflow3Warning
+ from airflow.models.errors import ParseImportError
+
+ warnings.warn(
+ f"Import '{__name__}.ImportError' is deprecated due to shadowing
with builtin exception "
+ f"ImportError and will be removed in the future. "
+ f"Please consider to use
'{ParseImportError.__module__}.ParseImportError' instead.",
+ RemovedInAirflow3Warning,
+ stacklevel=2,
+ )
+ val = ParseImportError
- val = import_string(f"{path}.{name}")
# Store for next time
globals()[name] = val
return val
@@ -94,7 +110,6 @@ __lazy_imports = {
"DagTag": "airflow.models.dag",
"DagWarning": "airflow.models.dagwarning",
"DbCallbackRequest": "airflow.models.db_callback_request",
- "ImportError": "airflow.models.errors",
"Log": "airflow.models.log",
"MappedOperator": "airflow.models.mappedoperator",
"Operator": "airflow.models.operator",
@@ -125,7 +140,6 @@ if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
- from airflow.models.errors import ImportError
from airflow.models.log import Log
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
diff --git a/airflow/models/errors.py b/airflow/models/errors.py
index ed76c6c355..f891b03d67 100644
--- a/airflow/models/errors.py
+++ b/airflow/models/errors.py
@@ -17,13 +17,16 @@
# under the License.
from __future__ import annotations
+import warnings
+
from sqlalchemy import Column, Integer, String, Text
+from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.base import Base
from airflow.utils.sqlalchemy import UtcDateTime
-class ImportError(Base):
+class ParseImportError(Base):
"""Stores all Import Errors which are recorded when parsing DAGs and
displayed on the Webserver."""
__tablename__ = "import_error"
@@ -32,3 +35,17 @@ class ImportError(Base):
filename = Column(String(1024))
stacktrace = Column(Text)
processor_subdir = Column(String(2000), nullable=True)
+
+
+def __getattr__(name: str):
+ # PEP-562: Lazy loaded attributes on python modules
+ if name == "ImportError":
+ warnings.warn(
+ f"Model class '{__name__}.ImportError' is deprecated due to
shadowing with builtin exception "
+ f"ImportError and will be removed in the future. "
+ f"Please consider to use '{__name__}.ParseImportError' instead.",
+ RemovedInAirflow3Warning,
+ stacklevel=2,
+ )
+ return ParseImportError
+ raise AttributeError(f"module {__name__} has no attribute {name}")
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 995833d6d6..513b453006 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -40,9 +40,9 @@ from sqlalchemy.ext.associationproxy import AssociationProxy
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.models import errors
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
+from airflow.models.errors import ParseImportError
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
@@ -196,7 +196,7 @@ def encode_dag_run(
def check_import_errors(fileloc, session):
# Check dag import errors
import_errors = session.scalars(
- select(errors.ImportError).where(errors.ImportError.filename ==
fileloc)
+ select(ParseImportError).where(ParseImportError.filename == fileloc)
).all()
if import_errors:
for import_error in import_errors:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5bdebe5d91..328312658b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -101,10 +101,11 @@ from airflow.hooks.base import BaseHook
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
-from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss,
Trigger, XCom, errors
+from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss,
Trigger, XCom
from airflow.models.dag import get_dataset_triggered_next_run_info
from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType
from airflow.models.dataset import DagScheduleDatasetReference,
DatasetDagRunQueue, DatasetEvent, DatasetModel
+from airflow.models.errors import ParseImportError
from airflow.models.operator import needs_expansion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
@@ -945,13 +946,13 @@ class Airflow(AirflowBaseView):
owner_links_dict = DagOwnerAttributes.get_all(session)
if
get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS):
- import_errors =
select(errors.ImportError).order_by(errors.ImportError.id)
+ import_errors =
select(ParseImportError).order_by(ParseImportError.id)
can_read_all_dags =
get_auth_manager().is_authorized_dag(method="GET")
if not can_read_all_dags:
# if the user doesn't have access to all DAGs, only
display errors from visible DAGs
import_errors = import_errors.where(
- errors.ImportError.filename.in_(
+ ParseImportError.filename.in_(
select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids))
)
)
diff --git a/pyproject.toml b/pyproject.toml
index bb4cf71d80..05971e7c4a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -434,6 +434,8 @@ banned-module-level-imports = ["numpy", "pandas"]
"airflow.PY312".msg = "Use sys.version_info >= (3, 12) instead."
# Deprecated imports
"airflow.models.baseoperator.BaseOperatorLink".msg = "Use
airflow.models.baseoperatorlink.BaseOperatorLink"
+"airflow.models.errors.ImportError".msg = "Use
airflow.models.errors.ParseImportError"
+"airflow.models.ImportError".msg = "Use airflow.models.errors.ParseImportError"
# Deprecated in Python 3.11, Pending Removal in Python 3.15:
https://github.com/python/cpython/issues/90817
# Deprecation warning in Python 3.11 also recommends using locale.getencoding
but it available in Python 3.11
"locale.getdefaultlocale".msg = "Use locale.setlocale() and locale.getlocale()
instead."
diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py
b/tests/api_connexion/endpoints/test_import_error_endpoint.py
index fae1312a32..ce084165d2 100644
--- a/tests/api_connexion/endpoints/test_import_error_endpoint.py
+++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py
@@ -22,7 +22,7 @@ import pytest
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models.dag import DagModel
-from airflow.models.errors import ImportError
+from airflow.models.errors import ParseImportError
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import provide_session
@@ -95,7 +95,7 @@ class TestBaseImportError:
class TestGetImportErrorEndpoint(TestBaseImportError):
def test_response_200(self, session):
- import_error = ImportError(
+ import_error = ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -128,7 +128,7 @@ class TestGetImportErrorEndpoint(TestBaseImportError):
} == response.json
def test_should_raises_401_unauthenticated(self, session):
- import_error = ImportError(
+ import_error = ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -147,7 +147,7 @@ class TestGetImportErrorEndpoint(TestBaseImportError):
assert response.status_code == 403
def test_should_raise_403_forbidden_without_dag_read(self, session):
- import_error = ImportError(
+ import_error = ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -164,7 +164,7 @@ class TestGetImportErrorEndpoint(TestBaseImportError):
def test_should_return_200_with_single_dag_read(self, session):
dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py")
session.add(dag_model)
- import_error = ImportError(
+ import_error = ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -190,7 +190,7 @@ class TestGetImportErrorEndpoint(TestBaseImportError):
for dag_id in TEST_DAG_IDS:
dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py")
session.add(dag_model)
- import_error = ImportError(
+ import_error = ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -216,7 +216,7 @@ class TestGetImportErrorEndpoint(TestBaseImportError):
class TestGetImportErrorsEndpoint(TestBaseImportError):
def test_get_import_errors(self, session):
import_error = [
- ImportError(
+ ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -251,7 +251,7 @@ class TestGetImportErrorsEndpoint(TestBaseImportError):
def test_get_import_errors_order_by(self, session):
import_error = [
- ImportError(
+ ParseImportError(
filename=f"Lorem_ipsum{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC") +
timedelta(days=-i),
@@ -288,7 +288,7 @@ class TestGetImportErrorsEndpoint(TestBaseImportError):
def test_order_by_raises_400_for_invalid_attr(self, session):
import_error = [
- ImportError(
+ ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -308,7 +308,7 @@ class TestGetImportErrorsEndpoint(TestBaseImportError):
def test_should_raises_401_unauthenticated(self, session):
import_error = [
- ImportError(
+ ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -327,7 +327,7 @@ class TestGetImportErrorsEndpoint(TestBaseImportError):
fake_filename = f"/tmp/{dag_id}.py"
dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename)
session.add(dag_model)
- importerror = ImportError(
+ importerror = ParseImportError(
filename=fake_filename,
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -360,7 +360,7 @@ class TestGetImportErrorsEndpoint(TestBaseImportError):
dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename)
session.add(dag_model)
- importerror = ImportError(
+ importerror = ParseImportError(
filename="/tmp/all_in_one.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -405,7 +405,7 @@ class
TestGetImportErrorsEndpointPagination(TestBaseImportError):
@provide_session
def test_limit_and_offset(self, url, expected_import_error_ids, session):
import_errors = [
- ImportError(
+ ParseImportError(
filename=f"/tmp/file_{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -423,7 +423,7 @@ class
TestGetImportErrorsEndpointPagination(TestBaseImportError):
def test_should_respect_page_size_limit_default(self, session):
import_errors = [
- ImportError(
+ ParseImportError(
filename=f"/tmp/file_{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -439,7 +439,7 @@ class
TestGetImportErrorsEndpointPagination(TestBaseImportError):
@conf_vars({("api", "maximum_page_limit"): "150"})
def test_should_return_conf_max_if_req_max_above_conf(self, session):
import_errors = [
- ImportError(
+ ParseImportError(
filename=f"/tmp/file_{i}.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
diff --git a/tests/api_connexion/schemas/test_error_schema.py
b/tests/api_connexion/schemas/test_error_schema.py
index 5cb6873a9e..7056417e66 100644
--- a/tests/api_connexion/schemas/test_error_schema.py
+++ b/tests/api_connexion/schemas/test_error_schema.py
@@ -23,7 +23,7 @@ from airflow.api_connexion.schemas.error_schema import (
import_error_collection_schema,
import_error_schema,
)
-from airflow.models.errors import ImportError
+from airflow.models.errors import ParseImportError
from airflow.utils import timezone
from airflow.utils.session import provide_session
from tests.test_utils.db import clear_db_import_errors
@@ -43,7 +43,7 @@ class TestErrorSchemaBase:
class TestErrorSchema(TestErrorSchemaBase):
@provide_session
def test_serialize(self, session):
- import_error = ImportError(
+ import_error = ParseImportError(
filename="lorem.py",
stacktrace="Lorem Ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -64,7 +64,7 @@ class TestErrorCollectionSchema(TestErrorSchemaBase):
@provide_session
def test_serialize(self, session):
import_error = [
- ImportError(
+ ParseImportError(
filename="Lorem_ipsum.py",
stacktrace="Lorem ipsum",
timestamp=timezone.parse(self.timestamp, timezone="UTC"),
@@ -73,7 +73,7 @@ class TestErrorCollectionSchema(TestErrorSchemaBase):
]
session.add_all(import_error)
session.commit()
- query = session.query(ImportError)
+ query = session.query(ParseImportError)
query_list = query.all()
serialized_data = import_error_collection_schema.dump(
ImportErrorCollection(import_errors=query_list, total_entries=2)
diff --git a/tests/api_experimental/common/test_delete_dag.py
b/tests/api_experimental/common/test_delete_dag.py
index a97e6e2bca..9fa98a4ffa 100644
--- a/tests/api_experimental/common/test_delete_dag.py
+++ b/tests/api_experimental/common/test_delete_dag.py
@@ -23,7 +23,7 @@ from airflow.api.common.delete_dag import delete_dag
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun as DR
-from airflow.models.errors import ImportError as IE
+from airflow.models.errors import ParseImportError as IE
from airflow.models.log import Log
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance as TI
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 683994f410..e35e2fb97f 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -52,8 +52,9 @@ from airflow.dag_processing.manager import (
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import Job
-from airflow.models import DagBag, DagModel, DbCallbackRequest, errors
+from airflow.models import DagBag, DagModel, DbCallbackRequest
from airflow.models.dagcode import DagCode
+from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.net import get_hostname
@@ -173,14 +174,14 @@ class TestDagProcessorJobRunner:
with create_session() as session:
self.run_processor_manager_one_loop(manager, parent_pipe)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
path_to_parse.unlink()
# Rerun the scheduler once the dag file has been removed
self.run_processor_manager_one_loop(manager, parent_pipe)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 0
session.rollback()
@@ -847,7 +848,7 @@ class TestDagProcessorJobRunner:
self.run_processor_manager_one_loop(manager, parent_pipe)
- import_errors =
session.query(errors.ImportError).order_by("id").all()
+ import_errors =
session.query(ParseImportError).order_by("id").all()
assert len(import_errors) == 1
assert import_errors[0].processor_subdir == str(processor_dir_1)
@@ -868,7 +869,7 @@ class TestDagProcessorJobRunner:
self.run_processor_manager_one_loop(manager, parent_pipe)
- import_errors =
session.query(errors.ImportError).order_by("id").all()
+ import_errors =
session.query(ParseImportError).order_by("id").all()
assert len(import_errors) == 2
assert import_errors[0].processor_subdir == str(processor_dir_1)
assert import_errors[1].processor_subdir == str(processor_dir_2)
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index a2b1eb1604..09b639806b 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -31,7 +31,8 @@ from airflow.callbacks.callback_requests import
TaskCallbackRequest
from airflow.configuration import TEST_DAGS_FOLDER, conf
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessor,
DagFileProcessorProcess
-from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors
+from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance
+from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.operators.empty import EmptyOperator
@@ -604,7 +605,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(unparseable_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -621,7 +622,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -646,7 +647,7 @@ class TestDagFileProcessor:
file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
self._process_file(temp_dagfile, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -661,7 +662,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(parseable_filename.as_posix(),
dag_directory=tmp_path, session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 0
@@ -674,7 +675,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 0
@@ -695,7 +696,7 @@ class TestDagFileProcessor:
)
self._process_file(unparseable_filename.as_posix(),
dag_directory=tmp_path, session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -717,7 +718,7 @@ class TestDagFileProcessor:
self._process_file(filename_to_parse, dag_directory=tmp_path,
session=session)
import_error_1 = (
-
session.query(errors.ImportError).filter(errors.ImportError.filename ==
filename_to_parse).one()
+ session.query(ParseImportError).filter(ParseImportError.filename
== filename_to_parse).one()
)
# process the file multiple times
@@ -725,7 +726,7 @@ class TestDagFileProcessor:
self._process_file(filename_to_parse, dag_directory=tmp_path,
session=session)
import_error_2 = (
-
session.query(errors.ImportError).filter(errors.ImportError.filename ==
filename_to_parse).one()
+ session.query(ParseImportError).filter(ParseImportError.filename
== filename_to_parse).one()
)
# assert that the ID of the import error did not change
@@ -745,7 +746,7 @@ class TestDagFileProcessor:
file_to_parse.writelines(PARSEABLE_DAG_FILE_CONTENTS)
self._process_file(filename_to_parse, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 0
@@ -760,7 +761,7 @@ class TestDagFileProcessor:
zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS)
self._process_file(zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
# Remove the import error from the file
@@ -768,7 +769,7 @@ class TestDagFileProcessor:
zip_file.writestr(TEMP_DAG_FILENAME, "import os # airflow DAG")
self._process_file(zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 0
session.rollback()
@@ -780,7 +781,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(unparseable_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -817,7 +818,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(unparseable_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -849,7 +850,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(invalid_zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
@@ -887,7 +888,7 @@ class TestDagFileProcessor:
with create_session() as session:
self._process_file(invalid_zip_filename, dag_directory=tmp_path,
session=session)
- import_errors = session.query(errors.ImportError).all()
+ import_errors = session.query(ParseImportError).all()
assert len(import_errors) == 1
import_error = import_errors[0]
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index 55583d0194..900598e20f 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -34,7 +34,6 @@ from airflow.models import (
Trigger,
Variable,
XCom,
- errors,
)
from airflow.models.dag import DagOwnerAttributes
from airflow.models.dagcode import DagCode
@@ -46,6 +45,7 @@ from airflow.models.dataset import (
DatasetModel,
TaskOutletDatasetReference,
)
+from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.fab.auth_manager.models import Permission, Resource,
assoc_permission_role
from airflow.security.permissions import RESOURCE_DAG_PREFIX
@@ -136,7 +136,7 @@ def clear_rendered_ti_fields():
def clear_db_import_errors():
with create_session() as session:
- session.query(errors.ImportError).delete()
+ session.query(ParseImportError).delete()
def clear_db_dag_warnings():