This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push: new 13b25673ac4 [v3-0-test] Fix Certain DAG import errors ("role does not exist") don't persist in Airflow (#51511) (#54432) 13b25673ac4 is described below commit 13b25673ac46be712ac537c9106cd9849bc24e8d Author: Kevin Yang <85313829+sjyangke...@users.noreply.github.com> AuthorDate: Wed Aug 13 08:13:41 2025 -0400 [v3-0-test] Fix Certain DAG import errors ("role does not exist") don't persist in Airflow (#51511) (#54432) ### Motivation As described in #49651, when the access control for a DAG is set to an non-exist role, the DAG import error show up in Airflow UI for a while and then disappear. The update is to fix this issue, and let the import error persist in the metadata DB until the DAG is updated with a correct access control setting. Close #49651 ### What is the issue https://github.com/apache/airflow/blob/56fbe90a8d0b56558c02d75f4ac5852e041cb058/airflow-core/src/airflow/dag_processing/collection.py#L177 When the DAG's access control is set to a non-exist role, the following process will raise an Exception "Failed to write serialized DAG dag_id=...". So, how this exception is triggered? 1. `dag_was_updated` will be `True` when the first time `SerializedDagModel.write_dag` write the serialized DAG to the database. 2. when `dag_was_updated` is `True`, `_sync_dag_perms` will be triggered to sync DAG specific permissions. At the moment, it detects that the role doesn't exist, and raise an error, resulting in the exception. 3. This exception will be captured, and being logged as an import error temporary in the DB, and show up in the UI. From my understanding, this sync process will run for every `MIN_SERIALIZED_DAG_UPDATE_INTERVAL`. So, what happen in the second run. 1. `dag_was_updated` will be `False` since the DAG code is not updated. 2. In this case, `_sync_dag_perms` will **NOT BE TRIGGERED** even though in the access control is set incorrectly in the DAG code. 3. Therefore, no exception will be raised, and no import error will be logged. Therefore, the import error is removed from the DB, as well as from the UI. ### What is the fix In the current state, `_sync_dag_perms` runs only when the DAG is updated (i.e., `dag_was_updated` is `True`). This can be more performant because it doesn't run for all the DAGs. However, it cannot properly handle the sync for permissions. Therefore, the current fix is to make `_sync_dag_perms` run for all the DAGs during the DAG sync process. I understand it might not be an ideal fix, but I wasn't able to find a better solution due to my limited understanding on the code. I would re [...] --- .../src/airflow/dag_processing/collection.py | 2 +- .../tests/unit/dag_processing/test_collection.py | 124 ++++++++++++++++++++- 2 files changed, 120 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index bfc2cb20662..e53889b945d 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -199,7 +199,7 @@ def _serialize_dag_capturing_errors( if not dag_was_updated: # Check and update DagCode DagCode.update_source_code(dag.dag_id, dag.fileloc) - elif "FabAuthManager" in conf.get("core", "auth_manager"): + if "FabAuthManager" in conf.get("core", "auth_manager"): _sync_dag_perms(dag, session=session) return [] diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index bb843221a13..5e353f0dddc 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -68,6 +68,11 @@ from tests_common.test_utils.db import ( if TYPE_CHECKING: from kgb import SpyAgency +mark_fab_auth_manager_test = pytest.mark.skipif( + condition="FabAuthManager" not in conf.get("core", "auth_manager"), + reason="This is only for FabAuthManager. Please set the environment variable `AIRFLOW__CORE__AUTH_MANAGER` to `airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager` in `files/airflow-breeze-config/environment_variables.env` before running breeze shell. To run the test, add the flag `--keep-env-variables` to the pytest command.", +) + def test_statement_latest_runs_one_dag(): with warnings.catch_warnings(): @@ -338,10 +343,7 @@ class TestUpdateDagParsingResults: ser_dict = SerializedDAG.to_dict(dag) return LazyDeserializedDAG(data=ser_dict) - @pytest.mark.skipif( - condition="FabAuthManager" not in conf.get("core", "auth_manager"), - reason="This is only for FabAuthManager", - ) + @mark_fab_auth_manager_test @pytest.mark.usefixtures("clean_db") # sync_perms in fab has bad session commit hygiene def test_sync_perms_syncs_dag_specific_perms_on_update( self, monkeypatch, spy_agency: SpyAgency, session, time_machine, testing_dag_bundle @@ -376,7 +378,8 @@ class TestUpdateDagParsingResults: # DAG isn't updated _sync_to_db() - spy_agency.assert_spy_not_called(sync_perms_spy) + # `_sync_dag_perms` should be called even the DAG isn't updated. Otherwise, any import error will not show up until DAG is updated. + spy_agency.assert_spy_called_with(sync_perms_spy, dag, session=session) # DAG is updated dag.tags = {"new_tag"} @@ -488,6 +491,117 @@ class TestUpdateDagParsingResults: assert len(dag_import_error_listener.existing) == 0 assert dag_import_error_listener.new["abc.py"] == import_error.stacktrace + @patch.object(ParseImportError, "full_file_path") + @mark_fab_auth_manager_test + @pytest.mark.usefixtures("clean_db") + def test_import_error_persist_for_invalid_access_control_role( + self, + mock_full_path, + monkeypatch, + session, + time_machine, + dag_import_error_listener, + testing_dag_bundle, + ): + """ + Test that import errors related to invalid access control role are tracked in the DB until being fixed. + """ + from airflow import settings + + serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() + assert serialized_dags_count == 0 + + monkeypatch.setattr(settings, "MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5) + time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False) + + # create a DAG and assign it a non-exist role. + dag = DAG( + dag_id="test_nonexist_access_control", + access_control={ + "non_existing_role": {"can_edit", "can_read", "can_delete"}, + }, + ) + dag.fileloc = "test_nonexist_access_control.py" + dag.relative_fileloc = "test_nonexist_access_control.py" + mock_full_path.return_value = "test_nonexist_access_control.py" + + # the DAG processor should raise an import error when processing the DAG above. + import_errors = {} + # run the DAG parsing. + update_dag_parsing_results_in_db("testing", None, [dag], import_errors, set(), session) + # expect to get an error with "role does not exist" message. + err = import_errors.get(("testing", dag.relative_fileloc)) + assert "AirflowException" in err + assert "role does not exist" in err + dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) + # the DAG should contain an import error. + assert dag_model.has_import_errors is True + + prev_import_errors = session.query(ParseImportError).all() + # the import error message should match. + assert len(prev_import_errors) == 1 + prev_import_error = prev_import_errors[0] + assert prev_import_error.filename == dag.relative_fileloc + assert "AirflowException" in prev_import_error.stacktrace + assert "role does not exist" in prev_import_error.stacktrace + + # this is a new import error. + assert len(dag_import_error_listener.new) == 1 + assert len(dag_import_error_listener.existing) == 0 + assert ( + dag_import_error_listener.new["test_nonexist_access_control.py"] == prev_import_error.stacktrace + ) + + # the DAG is serialized into the DB. + serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() + assert serialized_dags_count == 1 + + # run the update again. Even though the DAG is not updated, the processor should raise import error since the access control is not fixed. + time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 5), tick=False) + update_dag_parsing_results_in_db("testing", None, [dag], dict(), set(), session) + + dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) + # the DAG should contain an import error. + assert dag_model.has_import_errors is True + + import_errors = session.query(ParseImportError).all() + # the import error should still in the DB. + assert len(import_errors) == 1 + import_error = import_errors[0] + assert import_error.filename == dag.relative_fileloc + assert "AirflowException" in import_error.stacktrace + assert "role does not exist" in import_error.stacktrace + + # the new import error should be the same as the previous one + assert len(import_errors) == len(prev_import_errors) + assert import_error.filename == prev_import_error.filename + assert import_error.filename == dag.relative_fileloc + assert import_error.stacktrace == prev_import_error.stacktrace + + # there is a new error and an existing error. + assert len(dag_import_error_listener.new) == 1 + assert len(dag_import_error_listener.existing) == 1 + assert ( + dag_import_error_listener.new["test_nonexist_access_control.py"] == prev_import_error.stacktrace + ) + + # run the update again, but the incorrect access control configuration is removed. + time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 10), tick=False) + dag.access_control = None + update_dag_parsing_results_in_db("testing", None, [dag], dict(), set(), session) + + dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) + # the import error should be cleared. + assert dag_model.has_import_errors is False + + import_errors = session.query(ParseImportError).all() + # the import error should be cleared. + assert len(import_errors) == 0 + + # no import error should be introduced. + assert len(dag_import_error_listener.new) == 1 + assert len(dag_import_error_listener.existing) == 1 + def test_new_import_error_replaces_old(self, session, dag_import_error_listener, testing_dag_bundle): """ Test that existing import error is updated and new record not created