This is an automated email from the ASF dual-hosted git repository. amoghdesai pushed a commit to branch backport-a6e6c28-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e98ce91f6f0033c226d023c3f1ac02911cbd5c8f Author: Amogh Desai <[email protected]> AuthorDate: Wed Feb 11 22:25:50 2026 +0530 [v3-1-test] Make conn_type optional in task SDK Connection datamodel (#61728) (cherry picked from commit a6e6c28) Co-authored-by: Amogh Desai <[email protected]> --- task-sdk/src/airflow/sdk/definitions/connection.py | 2 +- task-sdk/tests/task_sdk/bases/test_hook.py | 47 ++++++++++++++++++++++ .../tests/task_sdk/definitions/test_connection.py | 19 +++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py b/task-sdk/src/airflow/sdk/definitions/connection.py index 89727b11c12..0ef0214a0ac 100644 --- a/task-sdk/src/airflow/sdk/definitions/connection.py +++ b/task-sdk/src/airflow/sdk/definitions/connection.py @@ -111,7 +111,7 @@ class Connection: """ conn_id: str - conn_type: str + conn_type: str | None = None description: str | None = None host: str | None = None schema: str | None = None diff --git a/task-sdk/tests/task_sdk/bases/test_hook.py b/task-sdk/tests/task_sdk/bases/test_hook.py index df1c28593e9..00e6200dc00 100644 --- a/task-sdk/tests/task_sdk/bases/test_hook.py +++ b/task-sdk/tests/task_sdk/bases/test_hook.py @@ -17,6 +17,9 @@ # under the License. from __future__ import annotations +import json +from unittest.mock import MagicMock, patch + import pytest from airflow.exceptions import AirflowNotFoundException @@ -115,3 +118,47 @@ class TestBaseHook: assert retrieved_conn.conn_id == "CONN_A" mock_supervisor_comms.send.assert_not_called() + + def test_get_connection_aws_auth_manager(self, mock_supervisor_comms): + """ + Test that hooks can retrieve connections without conn_type from backends + like AWS Secrets Manager (AF2 compatibility). + """ + secret_value = json.dumps( + { + "host": "mydb.us-east-1.rds.amazonaws.com", + "port": 5432, + "login": "admin", + "password": "secret123", + "schema": "production", + } + ) + + mock_client = MagicMock() + mock_client.get_secret_value.return_value = {"SecretString": secret_value} + + with patch( + "airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend.client", + new_callable=lambda: mock_client, + ): + with conf_vars( + { + ( + "secrets", + "backend", + ): "airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend", + ("secrets", "backend_kwargs"): '{"connections_prefix": "airflow/connections"}', + } + ): + hook = BaseHook(logger_name="") + retrieved_conn = hook.get_connection(conn_id="my_db_conn") + + assert retrieved_conn.conn_id == "my_db_conn" + assert retrieved_conn.conn_type is None + assert retrieved_conn.host == "mydb.us-east-1.rds.amazonaws.com" + assert retrieved_conn.port == 5432 + assert retrieved_conn.login == "admin" + assert retrieved_conn.password == "secret123" + assert retrieved_conn.schema == "production" + + mock_client.get_secret_value.assert_called_once() diff --git a/task-sdk/tests/task_sdk/definitions/test_connection.py b/task-sdk/tests/task_sdk/definitions/test_connection.py index 25fab314e2a..419dafb065c 100644 --- a/task-sdk/tests/task_sdk/definitions/test_connection.py +++ b/task-sdk/tests/task_sdk/definitions/test_connection.py @@ -190,6 +190,25 @@ class TestConnections: assert connection.host == "localhost" assert connection.port == 5432 + def test_from_json_without_conn_type(self): + """Test that from_json works without conn_type (backward compatibility with AF 2).""" + json_data = { + "host": "mydb.example.com", + "port": "5432", + "login": "admin", + "password": "secret", + "schema": "production", + } + connection = Connection.from_json(json.dumps(json_data), conn_id="test_conn") + + assert connection.conn_id == "test_conn" + assert connection.conn_type is None + assert connection.host == "mydb.example.com" + assert connection.port == 5432 + assert connection.login == "admin" + assert connection.password == "secret" + assert connection.schema == "production" + def test_extra_dejson_property(self): """Test that extra_dejson property correctly deserializes JSON extra field.""" connection = Connection(
