This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 33cf4223a09b2b92f416ce7bdfe9eb0a94a88537 Author: Amogh Desai <[email protected]> AuthorDate: Fri Jul 11 11:47:07 2025 +0530 [v3-0-test] Unify connection not found exceptions between AF2 and AF3 (#52968) (#53093) --- airflow-core/src/airflow/models/connection.py | 3 +-- airflow-core/tests/unit/dag_processing/test_processor.py | 2 +- task-sdk/src/airflow/sdk/definitions/connection.py | 10 ++++++++-- task-sdk/src/airflow/sdk/execution_time/context.py | 4 +++- task-sdk/tests/task_sdk/definitions/test_connections.py | 12 ++++++++++-- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index 2727c926e0f..9205083c9d3 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -478,8 +478,7 @@ class Connection(Base, LoggingMixin): return conn except AirflowRuntimeError as e: if e.error.error == ErrorType.CONNECTION_NOT_FOUND: - log.debug("Unable to retrieve connection from MetastoreBackend using Task SDK") - raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") + raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") from None raise # check cache first diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 71b601dfdd8..1649cb63896 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -324,7 +324,7 @@ class TestDagFileProcessor: assert result is not None assert result.import_errors != {} if result.import_errors: - assert "CONNECTION_NOT_FOUND" in next(iter(result.import_errors.values())) + assert "The conn_id `my_conn` isn't defined" in next(iter(result.import_errors.values())) def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_client): tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'") diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py b/task-sdk/src/airflow/sdk/definitions/connection.py index c66b264dce0..837a1d4e25d 100644 --- a/task-sdk/src/airflow/sdk/definitions/connection.py +++ b/task-sdk/src/airflow/sdk/definitions/connection.py @@ -24,7 +24,8 @@ from typing import Any import attrs -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowNotFoundException +from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType log = logging.getLogger(__name__) @@ -139,7 +140,12 @@ class Connection: def get(cls, conn_id: str) -> Any: from airflow.sdk.execution_time.context import _get_connection - return _get_connection(conn_id) + try: + return _get_connection(conn_id) + except AirflowRuntimeError as e: + if e.error.error == ErrorType.CONNECTION_NOT_FOUND: + raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") from None + raise @property def extra_dejson(self) -> dict: diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 744b3b08886..9ee2e03c9b8 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -272,7 +272,9 @@ class ConnectionAccessor: """Wrapper to access Connection entries in template.""" def __getattr__(self, conn_id: str) -> Any: - return _get_connection(conn_id) + from airflow.sdk.definitions.connection import Connection + + return Connection.get(conn_id) def __repr__(self) -> str: return "<ConnectionAccessor (dynamic access)>" diff --git a/task-sdk/tests/task_sdk/definitions/test_connections.py b/task-sdk/tests/task_sdk/definitions/test_connections.py index 3bbb63a7697..6e4d977c659 100644 --- a/task-sdk/tests/task_sdk/definitions/test_connections.py +++ b/task-sdk/tests/task_sdk/definitions/test_connections.py @@ -23,9 +23,10 @@ from urllib.parse import urlparse import pytest from airflow.configuration import initialize_secrets_backends -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.sdk import Connection -from airflow.sdk.execution_time.comms import ConnectionResult +from airflow.sdk.exceptions import ErrorType +from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS from tests_common.test_utils.config import conf_vars @@ -121,6 +122,13 @@ class TestConnections: extra=None, ) + def test_conn_get_not_found(self, mock_supervisor_comms): + error_response = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND) + mock_supervisor_comms.send.return_value = error_response + + with pytest.raises(AirflowNotFoundException, match="The conn_id `mysql_conn` isn't defined"): + _ = Connection.get(conn_id="mysql_conn") + class TestConnectionsFromSecrets: def test_get_connection_secrets_backend(self, mock_supervisor_comms, tmp_path):
