This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 13b25673ac4 [v3-0-test] Fix Certain DAG import errors ("role does not 
exist") don't persist in Airflow (#51511) (#54432)
13b25673ac4 is described below

commit 13b25673ac46be712ac537c9106cd9849bc24e8d
Author: Kevin Yang <85313829+sjyangke...@users.noreply.github.com>
AuthorDate: Wed Aug 13 08:13:41 2025 -0400

    [v3-0-test] Fix Certain DAG import errors ("role does not exist") don't 
persist in Airflow (#51511) (#54432)
    
    ### Motivation
    As described in #49651, when the access control for a DAG is set to an 
non-exist role, the DAG import error show up in Airflow UI for a while and then 
disappear. The update is to fix this issue, and let the import error persist in 
the metadata DB until the DAG is updated with a correct access control setting.
    
    Close #49651
    
    ### What is the issue
    
    
https://github.com/apache/airflow/blob/56fbe90a8d0b56558c02d75f4ac5852e041cb058/airflow-core/src/airflow/dag_processing/collection.py#L177
    
    When the DAG's access control is set to a non-exist role, the following 
process will raise an Exception "Failed to write serialized DAG dag_id=...". 
So, how this exception is triggered?
    1. `dag_was_updated` will be `True` when the first time 
`SerializedDagModel.write_dag` write the serialized DAG to the database.
    2. when `dag_was_updated` is `True`, `_sync_dag_perms` will be triggered to 
sync DAG specific permissions. At the moment, it detects that the role doesn't 
exist, and raise an error, resulting in the exception.
    3. This exception will be captured, and being logged as an import error 
temporary in the DB, and show up in the UI.
    
    From my understanding, this sync process will run for every 
`MIN_SERIALIZED_DAG_UPDATE_INTERVAL`. So, what happen in the second run.
    1. `dag_was_updated` will be `False` since the DAG code is not updated.
    2. In this case, `_sync_dag_perms` will **NOT BE TRIGGERED** even though in 
the access control is set incorrectly in the DAG code.
    3. Therefore, no exception will be raised, and no import error will be 
logged. Therefore, the import error is removed from the DB, as well as from the 
UI.
    
    ### What is the fix
    
    In the current state, `_sync_dag_perms` runs only when the DAG is updated 
(i.e., `dag_was_updated` is `True`). This can be more performant because it 
doesn't run for all the DAGs. However, it cannot properly handle the sync for 
permissions. Therefore, the current fix is to make `_sync_dag_perms` run for 
all the DAGs during the DAG sync process. I understand it might not be an ideal 
fix, but I wasn't able to find a better solution due to my limited 
understanding on the code. I would re [...]
---
 .../src/airflow/dag_processing/collection.py       |   2 +-
 .../tests/unit/dag_processing/test_collection.py   | 124 ++++++++++++++++++++-
 2 files changed, 120 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index bfc2cb20662..e53889b945d 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -199,7 +199,7 @@ def _serialize_dag_capturing_errors(
         if not dag_was_updated:
             # Check and update DagCode
             DagCode.update_source_code(dag.dag_id, dag.fileloc)
-        elif "FabAuthManager" in conf.get("core", "auth_manager"):
+        if "FabAuthManager" in conf.get("core", "auth_manager"):
             _sync_dag_perms(dag, session=session)
 
         return []
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index bb843221a13..5e353f0dddc 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -68,6 +68,11 @@ from tests_common.test_utils.db import (
 if TYPE_CHECKING:
     from kgb import SpyAgency
 
+mark_fab_auth_manager_test = pytest.mark.skipif(
+    condition="FabAuthManager" not in conf.get("core", "auth_manager"),
+    reason="This is only for FabAuthManager. Please set the environment 
variable `AIRFLOW__CORE__AUTH_MANAGER` to 
`airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager` in 
`files/airflow-breeze-config/environment_variables.env` before running breeze 
shell. To run the test, add the flag `--keep-env-variables` to the pytest 
command.",
+)
+
 
 def test_statement_latest_runs_one_dag():
     with warnings.catch_warnings():
@@ -338,10 +343,7 @@ class TestUpdateDagParsingResults:
         ser_dict = SerializedDAG.to_dict(dag)
         return LazyDeserializedDAG(data=ser_dict)
 
-    @pytest.mark.skipif(
-        condition="FabAuthManager" not in conf.get("core", "auth_manager"),
-        reason="This is only for FabAuthManager",
-    )
+    @mark_fab_auth_manager_test
     @pytest.mark.usefixtures("clean_db")  # sync_perms in fab has bad session 
commit hygiene
     def test_sync_perms_syncs_dag_specific_perms_on_update(
         self, monkeypatch, spy_agency: SpyAgency, session, time_machine, 
testing_dag_bundle
@@ -376,7 +378,8 @@ class TestUpdateDagParsingResults:
 
         # DAG isn't updated
         _sync_to_db()
-        spy_agency.assert_spy_not_called(sync_perms_spy)
+        # `_sync_dag_perms` should be called even the DAG isn't updated. 
Otherwise, any import error will not show up until DAG is updated.
+        spy_agency.assert_spy_called_with(sync_perms_spy, dag, session=session)
 
         # DAG is updated
         dag.tags = {"new_tag"}
@@ -488,6 +491,117 @@ class TestUpdateDagParsingResults:
         assert len(dag_import_error_listener.existing) == 0
         assert dag_import_error_listener.new["abc.py"] == 
import_error.stacktrace
 
+    @patch.object(ParseImportError, "full_file_path")
+    @mark_fab_auth_manager_test
+    @pytest.mark.usefixtures("clean_db")
+    def test_import_error_persist_for_invalid_access_control_role(
+        self,
+        mock_full_path,
+        monkeypatch,
+        session,
+        time_machine,
+        dag_import_error_listener,
+        testing_dag_bundle,
+    ):
+        """
+        Test that import errors related to invalid access control role are 
tracked in the DB until being fixed.
+        """
+        from airflow import settings
+
+        serialized_dags_count = 
session.query(func.count(SerializedDagModel.dag_id)).scalar()
+        assert serialized_dags_count == 0
+
+        monkeypatch.setattr(settings, "MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
+        time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False)
+
+        # create a DAG and assign it a non-exist role.
+        dag = DAG(
+            dag_id="test_nonexist_access_control",
+            access_control={
+                "non_existing_role": {"can_edit", "can_read", "can_delete"},
+            },
+        )
+        dag.fileloc = "test_nonexist_access_control.py"
+        dag.relative_fileloc = "test_nonexist_access_control.py"
+        mock_full_path.return_value = "test_nonexist_access_control.py"
+
+        # the DAG processor should raise an import error when processing the 
DAG above.
+        import_errors = {}
+        # run the DAG parsing.
+        update_dag_parsing_results_in_db("testing", None, [dag], 
import_errors, set(), session)
+        # expect to get an error with "role does not exist" message.
+        err = import_errors.get(("testing", dag.relative_fileloc))
+        assert "AirflowException" in err
+        assert "role does not exist" in err
+        dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
+        # the DAG should contain an import error.
+        assert dag_model.has_import_errors is True
+
+        prev_import_errors = session.query(ParseImportError).all()
+        # the import error message should match.
+        assert len(prev_import_errors) == 1
+        prev_import_error = prev_import_errors[0]
+        assert prev_import_error.filename == dag.relative_fileloc
+        assert "AirflowException" in prev_import_error.stacktrace
+        assert "role does not exist" in prev_import_error.stacktrace
+
+        # this is a new import error.
+        assert len(dag_import_error_listener.new) == 1
+        assert len(dag_import_error_listener.existing) == 0
+        assert (
+            dag_import_error_listener.new["test_nonexist_access_control.py"] 
== prev_import_error.stacktrace
+        )
+
+        # the DAG is serialized into the DB.
+        serialized_dags_count = 
session.query(func.count(SerializedDagModel.dag_id)).scalar()
+        assert serialized_dags_count == 1
+
+        # run the update again. Even though the DAG is not updated, the 
processor should raise import error since the access control is not fixed.
+        time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 5), tick=False)
+        update_dag_parsing_results_in_db("testing", None, [dag], dict(), 
set(), session)
+
+        dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
+        # the DAG should contain an import error.
+        assert dag_model.has_import_errors is True
+
+        import_errors = session.query(ParseImportError).all()
+        # the import error should still in the DB.
+        assert len(import_errors) == 1
+        import_error = import_errors[0]
+        assert import_error.filename == dag.relative_fileloc
+        assert "AirflowException" in import_error.stacktrace
+        assert "role does not exist" in import_error.stacktrace
+
+        # the new import error should be the same as the previous one
+        assert len(import_errors) == len(prev_import_errors)
+        assert import_error.filename == prev_import_error.filename
+        assert import_error.filename == dag.relative_fileloc
+        assert import_error.stacktrace == prev_import_error.stacktrace
+
+        # there is a new error and an existing error.
+        assert len(dag_import_error_listener.new) == 1
+        assert len(dag_import_error_listener.existing) == 1
+        assert (
+            dag_import_error_listener.new["test_nonexist_access_control.py"] 
== prev_import_error.stacktrace
+        )
+
+        # run the update again, but the incorrect access control configuration 
is removed.
+        time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 10), tick=False)
+        dag.access_control = None
+        update_dag_parsing_results_in_db("testing", None, [dag], dict(), 
set(), session)
+
+        dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
+        # the import error should be cleared.
+        assert dag_model.has_import_errors is False
+
+        import_errors = session.query(ParseImportError).all()
+        # the import error should be cleared.
+        assert len(import_errors) == 0
+
+        # no import error should be introduced.
+        assert len(dag_import_error_listener.new) == 1
+        assert len(dag_import_error_listener.existing) == 1
+
     def test_new_import_error_replaces_old(self, session, 
dag_import_error_listener, testing_dag_bundle):
         """
         Test that existing import error is updated and new record not created

Reply via email to