This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 796b46ee8ab Fix import errors not cleared for files without DAGs
(#58242)
796b46ee8ab is described below
commit 796b46ee8aba6ebaef767ec5592b826a636174e3
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Nov 25 13:13:46 2025 +0100
Fix import errors not cleared for files without DAGs (#58242)
* Fix import errors not cleared for files without DAGs
Previously, import errors persisted in the database for files that were
successfully parsed but no longer contained any DAGs. This happened
because we only tracked files that had DAGs, missing files that
were parsed successfully but had their DAGs removed.
Now, when files are parsed, all parsed files are tracked (not just those
with DAGs), ensuring import errors are properly cleared when a file is
successfully parsed without errors, even if it no longer contains DAGs.
closes: #57621
* Apply suggestions from code review
Co-authored-by: Tzu-ping Chung <[email protected]>
* fixup! Fix import errors not cleared for files without DAGs
* Remove fallback for files_parsed
* fixup! fixup! Fix import errors not cleared for files without DAGs
* fixup! Remove fallback for files_parsed
* fixup! fixup! fixup! Fix import errors not cleared for files without DAGs
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../src/airflow/dag_processing/collection.py | 40 ++++++++--------
airflow-core/src/airflow/dag_processing/dagbag.py | 11 +++++
airflow-core/src/airflow/dag_processing/manager.py | 11 +++++
.../tests/unit/dag_processing/test_collection.py | 54 +++++++++++++++++++++-
4 files changed, 94 insertions(+), 22 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 91bff694a72..942ea15ab9e 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -285,21 +285,22 @@ def _update_import_errors(
):
from airflow.listeners.listener import get_listener_manager
- # We can remove anything from files parsed in this batch that doesn't have
an error. We need to remove old
- # errors (i.e. from files that are removed) separately
-
- session.execute(
- delete(ParseImportError).where(
- tuple_(ParseImportError.bundle_name,
ParseImportError.filename).in_(files_parsed)
- )
- )
-
- # the below query has to match (bundle_name, filename) tuple in that order
since the
- # import_errors list is a dict with keys as (bundle_name, relative_fileloc)
+ # Check existing import errors BEFORE deleting, so we can determine if we
should update or create
existing_import_error_files = set(
session.execute(select(ParseImportError.bundle_name,
ParseImportError.filename))
)
- # Add the errors of the processed files
+
+ # Delete errors for files that were parsed but don't have errors in
import_errors
+ # (i.e., files that were successfully parsed without errors)
+ files_to_clear = files_parsed.difference(import_errors)
+ if files_to_clear:
+ session.execute(
+ delete(ParseImportError).where(
+ tuple_(ParseImportError.bundle_name,
ParseImportError.filename).in_(files_to_clear)
+ )
+ )
+
+ # Add or update the errors of the processed files
for key, stacktrace in import_errors.items():
bundle_name_, relative_fileloc = key
@@ -371,6 +372,7 @@ def update_dag_parsing_results_in_db(
session: Session,
*,
warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,),
+ files_parsed: set[tuple[str, str]] | None = None,
):
"""
Update everything to do with DAG parsing in the DB.
@@ -388,6 +390,10 @@ def update_dag_parsing_results_in_db(
then all warnings and errors related to this file will be removed.
``import_errors`` will be updated in place with an new errors
+
+ :param files_parsed: Set of (bundle_name, relative_fileloc) tuples for all
files that were parsed.
+ If None, will be inferred from dags and import_errors. Passing this
explicitly ensures that
+ import errors are cleared for files that were parsed but no longer
contain DAGs.
"""
# Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in
case
# of any Operational Errors
@@ -423,16 +429,8 @@ def update_dag_parsing_results_in_db(
import_errors.update(serialize_errors)
# Record import errors into the ORM - we don't retry on this one as it's
not as critical that it works
try:
- # TODO: This won't clear errors for files that exist that no longer
contain DAGs. Do we need to pass
- # in the list of file parsed?
-
- good_dag_filelocs = {
- (bundle_name, dag.relative_fileloc)
- for dag in dags
- if dag.relative_fileloc is not None and (bundle_name,
dag.relative_fileloc) not in import_errors
- }
_update_import_errors(
- files_parsed=good_dag_filelocs,
+ files_parsed=files_parsed if files_parsed is not None else set(),
bundle_name=bundle_name,
import_errors=import_errors,
session=session,
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 173f5b05b4e..0259992dc78 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -695,6 +695,16 @@ def sync_bag_to_db(
from airflow.dag_processing.collection import
update_dag_parsing_results_in_db
import_errors = {(bundle_name, rel_path): error for rel_path, error in
dagbag.import_errors.items()}
+
+ # Build the set of all files that were parsed and include files with
import errors
+ # in case they are not in file_last_changed
+ files_parsed = set(import_errors)
+ if dagbag.bundle_path:
+ files_parsed.update(
+ (bundle_name, dagbag._get_relative_fileloc(abs_filepath))
+ for abs_filepath in dagbag.file_last_changed
+ )
+
update_dag_parsing_results_in_db(
bundle_name,
bundle_version,
@@ -703,4 +713,5 @@ def sync_bag_to_db(
None, # file parsing duration is not well defined when parsing
multiple files / multiple DAGs.
dagbag.dag_warnings,
session=session,
+ files_parsed=files_parsed,
)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 232c0d978ac..ae45a690074 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -858,6 +858,7 @@ class DagFileProcessorManager(LoggingMixin):
parsing_result=proc.parsing_result,
session=session,
is_callback_only=is_callback_only,
+ relative_fileloc=str(file.rel_path),
)
for file in finished:
@@ -1147,6 +1148,7 @@ def process_parse_results(
session: Session,
*,
is_callback_only: bool = False,
+ relative_fileloc: str | None = None,
) -> DagFileStat:
"""Take the parsing result and stats about the parser process and convert
it into a DagFileStat."""
if is_callback_only:
@@ -1181,6 +1183,14 @@ def process_parse_results(
import_errors = {
(bundle_name, rel_path): error for rel_path, error in
parsing_result.import_errors.items()
}
+
+ # Build the set of files that were parsed. This includes the file that
was parsed,
+ # even if it no longer contains DAGs, so we can clear old import
errors.
+ files_parsed: set[tuple[str, str]] | None = None
+ if relative_fileloc is not None:
+ files_parsed = {(bundle_name, relative_fileloc)}
+ files_parsed.update(import_errors.keys())
+
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=bundle_version,
@@ -1189,6 +1199,7 @@ def process_parse_results(
parse_duration=run_duration,
warnings=set(parsing_result.warnings or []),
session=session,
+ files_parsed=files_parsed,
)
stat.num_dags = len(parsing_result.serialized_dags)
if parsing_result.import_errors:
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 64e15ede317..f98cf715b2f 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -492,7 +492,9 @@ class TestUpdateDagParsingResults:
mock_full_path.return_value = "abc.py"
import_errors = {}
- update_dag_parsing_results_in_db("testing", None, [dag],
import_errors, None, set(), session)
+ update_dag_parsing_results_in_db(
+ "testing", None, [dag], import_errors, None, set(), session,
files_parsed={("testing", "abc.py")}
+ )
assert "SerializationError" in caplog.text
# Should have been edited in place
@@ -656,6 +658,7 @@ class TestUpdateDagParsingResults:
parse_duration=None,
warnings=set(),
session=session,
+ files_parsed={("testing", "abc.py")},
)
import_error = (
@@ -714,6 +717,7 @@ class TestUpdateDagParsingResults:
parse_duration=None,
warnings=set(),
session=session,
+ files_parsed={(bundle_name, "abc.py")},
)
dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
assert dag_model.has_import_errors is False
@@ -758,6 +762,7 @@ class TestUpdateDagParsingResults:
parse_duration=None,
warnings=set(),
session=session,
+ files_parsed={(bundle_name, "abc.py")},
)
dag_model = session.get(DagModel, (dag.dag_id,))
assert dag_model.has_import_errors is True
@@ -774,6 +779,53 @@ class TestUpdateDagParsingResults:
)
assert dag_model.has_import_errors is False
+ @pytest.mark.usefixtures("clean_db")
+ def test_clear_import_error_for_file_without_dags(self,
testing_dag_bundle, session):
+ """
+ Test that import errors are cleared for files that were parsed but no
longer contain DAGs.
+ """
+ bundle_name = "testing"
+ filename = "no_dags.py"
+
+ prev_error = ParseImportError(
+ filename=filename,
+ bundle_name=bundle_name,
+ timestamp=tz.utcnow(),
+ stacktrace="Previous import error",
+ )
+ session.add(prev_error)
+
+ # And import error for another file we haven't parsed (this shouldn't
be deleted)
+ other_file_error = ParseImportError(
+ filename="other.py",
+ bundle_name=bundle_name,
+ timestamp=tz.utcnow(),
+ stacktrace="Some error",
+ )
+ session.add(other_file_error)
+ session.flush()
+
+ import_errors = set(session.execute(select(ParseImportError.filename,
ParseImportError.bundle_name)))
+ assert import_errors == {("no_dags.py", bundle_name), ("other.py",
bundle_name)}
+
+ # Simulate parsing the file: it was parsed successfully (no import
errors),
+ # but it no longer contains any DAGs. By passing files_parsed, we
ensure
+ # the import error is cleared even though there are no DAGs.
+ files_parsed = {(bundle_name, filename)}
+ update_dag_parsing_results_in_db(
+ bundle_name=bundle_name,
+ bundle_version=None,
+ dags=[], # No DAGs in this file
+ import_errors={}, # No import errors
+ parse_duration=None,
+ warnings=set(),
+ session=session,
+ files_parsed=files_parsed,
+ )
+
+ import_errors = set(session.execute(select(ParseImportError.filename,
ParseImportError.bundle_name)))
+ assert import_errors == {("other.py", bundle_name)}, "Import error for
parsed file should be cleared"
+
@pytest.mark.need_serialized_dag(False)
@pytest.mark.parametrize(
("attrs", "expected"),