This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 656a5beeed3 Fix connection test API to restore masked password/extra
from existing connections (#59643) (#60873)
656a5beeed3 is described below
commit 656a5beeed3df02e46f8bb8b3322cccb86410804
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Jan 21 16:48:22 2026 +0100
Fix connection test API to restore masked password/extra from existing
connections (#59643) (#60873)
* Fix connection test API to restore masked password/extra from existing
connections
* Use get_connection_from_secrets in test endpoint
* Remove unused session parameter
* Add use_existing_credentials flag to test connections
* fix api-gen description
* Standardize docstring format
* Add use_existing_credentials flag to test connections
* Refactor connection test logic using update_orm_from_pydantic
* Remove useExistingCredentials flag from TestConnectionButton mutation
* Add proper assertions for test connection merge tests
* Small test adjustment
---------
(cherry picked from commit efb27dc9e2f8b128573d130185a8d334f307a02a)
Co-authored-by: Yeonguk Choo <[email protected]>
---
.../core_api/routes/public/connections.py | 19 ++-
.../core_api/routes/public/test_connections.py | 136 +++++++++++++++++++++
2 files changed, 149 insertions(+), 6 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
index 295c6f156f0..b7e79f86614 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
@@ -50,6 +50,7 @@ from airflow.api_fastapi.core_api.services.public.connections
import (
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.configuration import conf
+from airflow.exceptions import AirflowNotFoundException
from airflow.models import Connection
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.utils.db import create_default_connections as
db_create_default_connections
@@ -207,9 +208,7 @@ def patch_connection(
@connections_router.post("/test",
dependencies=[Depends(requires_access_connection(method="POST"))])
-def test_connection(
- test_body: ConnectionBody,
-) -> ConnectionTestResponse:
+def test_connection(test_body: ConnectionBody) -> ConnectionTestResponse:
"""
Test an API connection.
@@ -227,9 +226,17 @@ def test_connection(
transient_conn_id = get_random_string()
conn_env_var = f"{CONN_ENV_PREFIX}{transient_conn_id.upper()}"
try:
- data = test_body.model_dump(by_alias=True)
- data["conn_id"] = transient_conn_id
- conn = Connection(**data)
+ # Try to get existing connection and merge with provided values
+ try:
+ existing_conn =
Connection.get_connection_from_secrets(test_body.connection_id)
+ existing_conn.conn_id = transient_conn_id
+ update_orm_from_pydantic(existing_conn, test_body)
+ conn = existing_conn
+ except AirflowNotFoundException:
+ data = test_body.model_dump(by_alias=True)
+ data["conn_id"] = transient_conn_id
+ conn = Connection(**data)
+
os.environ[conn_env_var] = conn.get_uri()
test_status, test_message = conn.test_connection()
return ConnectionTestResponse.model_validate({"status": test_status,
"message": test_message})
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index b674b7a6c3a..1f1a51a0102 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -16,11 +16,13 @@
# under the License.
from __future__ import annotations
+import json
import os
from importlib.metadata import PackageNotFoundError, metadata
from unittest import mock
import pytest
+from sqlalchemy import func, select
from airflow.models import Connection
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
@@ -920,6 +922,140 @@ class TestConnection(TestConnectionEndpoint):
"Contact your deployment admin to enable it."
}
+ @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+ def test_should_merge_password_with_existing_connection(self, test_client,
session):
+ connection = Connection(
+ conn_id=TEST_CONN_ID,
+ conn_type="sqlite",
+ password="existing_password",
+ )
+ session.add(connection)
+ session.commit()
+ initial_count =
session.scalar(select(func.count()).select_from(Connection))
+
+ captured_value = {}
+
+ def mock_test_connection(self):
+ captured_value["password"] = self.password
+ captured_value["conn_type"] = self.conn_type
+ return True, "mocked"
+
+ body = {
+ "connection_id": TEST_CONN_ID,
+ "conn_type": "new_sqlite",
+ "password": "***",
+ }
+
+ with mock.patch.object(Connection, "test_connection",
mock_test_connection):
+ response = test_client.post("/connections/test", json=body)
+
+ assert response.status_code == 200
+ assert response.json()["status"] is True
+ # Verify that the existing password was used, not "***"
+ assert captured_value["password"] == "existing_password"
+ # Verify that payload info were used for other fields
+ assert captured_value["conn_type"] == "new_sqlite"
+
+ # Verify DB was not mutated
+ session.expire_all()
+ db_conn =
session.scalar(select(Connection).filter_by(conn_id=TEST_CONN_ID))
+ assert db_conn.password == "existing_password"
+ assert session.scalar(select(func.count()).select_from(Connection)) ==
initial_count
+
+ @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+ def test_should_merge_extra_with_existing_connection(self, test_client,
session):
+ connection = Connection(
+ conn_id=TEST_CONN_ID,
+ conn_type="fs",
+ extra='{"path": "/", "existing_key": "existing_value"}',
+ )
+ session.add(connection)
+ session.commit()
+ initial_count =
session.scalar(select(func.count()).select_from(Connection))
+
+ captured_extra = {}
+
+ def mock_test_connection(self):
+ captured_extra["value"] = self.extra
+ return True, "mocked"
+
+ body = {
+ "connection_id": TEST_CONN_ID,
+ "conn_type": "fs",
+ "extra": '{"path": "/", "new_key": "new_value"}',
+ }
+
+ with mock.patch.object(Connection, "test_connection",
mock_test_connection):
+ response = test_client.post("/connections/test", json=body)
+
+ assert response.status_code == 200
+ assert response.json()["status"] is True
+ # Verify that new_key is reflected in the merged extra
+ merged_extra = json.loads(captured_extra["value"])
+ assert merged_extra["new_key"] == "new_value"
+ assert merged_extra["path"] == "/"
+
+ # Verify DB was not mutated
+ session.expire_all()
+ db_conn =
session.scalar(select(Connection).filter_by(conn_id=TEST_CONN_ID))
+ assert json.loads(db_conn.extra) == {"path": "/", "existing_key":
"existing_value"}
+ assert session.scalar(select(func.count()).select_from(Connection)) ==
initial_count
+
+ @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+ def test_should_merge_both_password_and_extra(self, test_client, session):
+ connection = Connection(
+ conn_id=TEST_CONN_ID,
+ conn_type="fs",
+ password="existing_password",
+ extra='{"path": "/", "existing_key": "existing_value"}',
+ )
+ session.add(connection)
+ session.commit()
+ initial_count =
session.scalar(select(func.count()).select_from(Connection))
+
+ captured_values = {}
+
+ def mock_test_connection(self):
+ captured_values["password"] = self.password
+ captured_values["extra"] = self.extra
+ return True, "mocked"
+
+ body = {
+ "connection_id": TEST_CONN_ID,
+ "conn_type": "fs",
+ "password": "***",
+ "extra": '{"path": "/", "new_key": "new_value"}',
+ }
+
+ with mock.patch.object(Connection, "test_connection",
mock_test_connection):
+ response = test_client.post("/connections/test", json=body)
+
+ assert response.status_code == 200
+ assert response.json()["status"] is True
+ # Verify that the existing password was used, not "***"
+ assert captured_values["password"] == "existing_password"
+ # Verify that new_key is reflected in the merged extra
+ merged_extra = json.loads(captured_values["extra"])
+ assert merged_extra["new_key"] == "new_value"
+ assert merged_extra["path"] == "/"
+
+ # Verify DB was not mutated
+ session.expire_all()
+ db_conn =
session.scalar(select(Connection).filter_by(conn_id=TEST_CONN_ID))
+ assert db_conn.password == "existing_password"
+ assert json.loads(db_conn.extra) == {"path": "/", "existing_key":
"existing_value"}
+ assert session.scalar(select(func.count()).select_from(Connection)) ==
initial_count
+
+ @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+ def test_should_test_new_connection_without_existing(self, test_client):
+ body = {
+ "connection_id": "non_existent_conn",
+ "conn_type": "sqlite",
+ }
+ response = test_client.post("/connections/test", json=body)
+ assert response.status_code == 200
+ assert response.json()["status"] is True
+
class TestCreateDefaultConnections(TestConnectionEndpoint):
def test_should_respond_204(self, test_client, session):