This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3b8777660b9ec81c4a891febcb84eb3f93d10e27 Author: Amogh Desai <[email protected]> AuthorDate: Tue Dec 9 17:35:11 2025 +0530 [v3 branch] Fix misleading error message when GitHook creation fails (#59236) --- .../src/airflow/dag_processing/bundles/manager.py | 19 +++++++-- .../bundles/test_dag_bundle_manager.py | 47 ++++++++++++++++++++++ .../git/src/airflow/providers/git/bundles/git.py | 5 ++- providers/git/tests/unit/git/bundles/test_git.py | 35 +++++++++------- 4 files changed, 87 insertions(+), 19 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index cf3b7c51048..9b3cf4189c5 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -205,7 +205,11 @@ class DagBundlesManager(LoggingMixin): for name in self._bundle_config.keys(): if bundle := stored.pop(name, None): bundle.active = True - new_template, new_params = _extract_and_sign_template(name) + try: + new_template, new_params = _extract_and_sign_template(name) + except Exception as e: + self.log.exception("Error creating bundle '%s': %s", name, e) + continue if new_template != bundle.signed_url_template: bundle.signed_url_template = new_template self.log.debug("Updated URL template for bundle %s", name) @@ -213,7 +217,11 @@ class DagBundlesManager(LoggingMixin): bundle.template_params = new_params self.log.debug("Updated template parameters for bundle %s", name) else: - new_template, new_params = _extract_and_sign_template(name) + try: + new_template, new_params = _extract_and_sign_template(name) + except Exception as e: + self.log.exception("Error creating bundle '%s': %s", name, e) + continue new_bundle = DagBundleModel(name=name) new_bundle.signed_url_template = new_template new_bundle.template_params = new_params @@ -280,7 +288,12 @@ class DagBundlesManager(LoggingMixin): :return: list of DAG bundles. """ for name, (class_, kwargs) in self._bundle_config.items(): - yield class_(name=name, version=None, **kwargs) + try: + yield class_(name=name, version=None, **kwargs) + except Exception as e: + self.log.exception("Error creating bundle '%s': %s", name, e) + # Skip this bundle and continue with others + continue def view_url(self, name: str, version: str | None = None) -> str | None: warnings.warn( diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index b1e8b4f8b65..e532b53d969 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -379,3 +379,50 @@ def test_example_dags_name_is_reserved(): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(reserved_name_config)}): with pytest.raises(AirflowConfigException, match="Bundle name 'example_dags' is a reserved name."): DagBundlesManager().parse_config() + + +class FailingBundle(BaseDagBundle): + """Test bundle that raises an exception during initialization.""" + + def __init__(self, *, should_fail: bool = True, **kwargs): + super().__init__(**kwargs) + if should_fail: + raise ValueError("Bundle creation failed for testing") + + def refresh(self): + pass + + def get_current_version(self): + return None + + @property + def path(self): + return "/tmp/failing" + + +FAILING_BUNDLE_CONFIG = [ + { + "name": "failing-bundle", + "classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.FailingBundle", + "kwargs": {"should_fail": True, "refresh_interval": 1}, + } +] + + +@conf_vars({("core", "LOAD_EXAMPLES"): "False"}) [email protected]_test +def test_multiple_bundles_one_fails(clear_db, session): + """Test that when one bundle fails to create, other bundles still load successfully.""" + mix_config = BASIC_BUNDLE_CONFIG + FAILING_BUNDLE_CONFIG + + with patch.dict(os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(mix_config)}): + manager = DagBundlesManager() + + bundles = list(manager.get_all_dag_bundles()) + assert len(bundles) == 1 + assert bundles[0].name == "my-test-bundle" + assert isinstance(bundles[0], BasicBundle) + + manager.sync_bundles_to_db() + bundle_names = {b.name for b in session.query(DagBundleModel).all()} + assert bundle_names == {"my-test-bundle"} diff --git a/providers/git/src/airflow/providers/git/bundles/git.py b/providers/git/src/airflow/providers/git/bundles/git.py index bd8dba47310..3ef88b82dd7 100644 --- a/providers/git/src/airflow/providers/git/bundles/git.py +++ b/providers/git/src/airflow/providers/git/bundles/git.py @@ -80,8 +80,9 @@ class GitDagBundle(BaseDagBundle): self.hook: GitHook | None = None try: self.hook = GitHook(git_conn_id=git_conn_id or "git_default", repo_url=self.repo_url) - except Exception as e: - self._log.warning("Could not create GitHook", conn_id=git_conn_id, exc=e) + except Exception: + # re raise so exception propagates immediately with clear error message + raise if self.hook and self.hook.repo_url: self.repo_url = self.hook.repo_url diff --git a/providers/git/tests/unit/git/bundles/test_git.py b/providers/git/tests/unit/git/bundles/test_git.py index 950388f6d6c..b59bd8605c5 100644 --- a/providers/git/tests/unit/git/bundles/test_git.py +++ b/providers/git/tests/unit/git/bundles/test_git.py @@ -33,8 +33,6 @@ from airflow.exceptions import AirflowException from airflow.models import Connection from airflow.providers.git.bundles.git import GitDagBundle from airflow.providers.git.hooks.git import GitHook -from airflow.sdk.exceptions import ErrorType -from airflow.sdk.execution_time.comms import ErrorResponse from tests_common.test_utils.config import conf_vars from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS @@ -673,20 +671,29 @@ class TestGitDagBundle: @patch.dict(os.environ, {"AIRFLOW_CONN_MY_TEST_GIT": '{"host": "something", "conn_type": "git"}'}) @pytest.mark.parametrize( - "conn_id, expected_hook_type", - [("my_test_git", GitHook), ("something-else", type(None))], + ("conn_id", "expected_hook_type", "exception_expected"), + [ + ("my_test_git", GitHook, False), + ("something-else", None, True), + ], ) - def test_repo_url_access_missing_connection_doesnt_error( - self, conn_id, expected_hook_type, mock_supervisor_comms + def test_repo_url_access_missing_connection_raises_exception( + self, conn_id, expected_hook_type, exception_expected ): - if expected_hook_type is type(None): - mock_supervisor_comms.send.return_value = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND) - bundle = GitDagBundle( - name="testa", - tracking_ref="main", - git_conn_id=conn_id, - ) - assert isinstance(bundle.hook, expected_hook_type) + if exception_expected: + with pytest.raises(Exception, match="The conn_id `something-else` isn't defined"): + GitDagBundle( + name="testa", + tracking_ref="main", + git_conn_id=conn_id, + ) + else: + bundle = GitDagBundle( + name="testa", + tracking_ref="main", + git_conn_id=conn_id, + ) + assert isinstance(bundle.hook, expected_hook_type) @mock.patch("airflow.providers.git.bundles.git.GitHook") def test_lock_used(self, mock_githook, git_repo):
