This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 656a5beeed3 Fix connection test API to restore masked password/extra 
from existing connections (#59643) (#60873)
656a5beeed3 is described below

commit 656a5beeed3df02e46f8bb8b3322cccb86410804
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Jan 21 16:48:22 2026 +0100

    Fix connection test API to restore masked password/extra from existing 
connections (#59643) (#60873)
    
    * Fix connection test API to restore masked password/extra from existing 
connections
    
    * Use get_connection_from_secrets in test endpoint
    
    * Remove unused session parameter
    
    * Add use_existing_credentials flag to test connections
    
    * fix api-gen description
    
    * Standardize docstring format
    
    * Add use_existing_credentials flag to test connections
    
    * Refactor connection test logic using update_orm_from_pydantic
    
    * Remove useExistingCredentials flag from TestConnectionButton mutation
    
    * Add proper assertions for test connection merge tests
    
    * Small test adjustment
    
    ---------
    
    
    (cherry picked from commit efb27dc9e2f8b128573d130185a8d334f307a02a)
    
    Co-authored-by: Yeonguk Choo <[email protected]>
---
 .../core_api/routes/public/connections.py          |  19 ++-
 .../core_api/routes/public/test_connections.py     | 136 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 6 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
index 295c6f156f0..b7e79f86614 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py
@@ -50,6 +50,7 @@ from airflow.api_fastapi.core_api.services.public.connections 
import (
 )
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.configuration import conf
+from airflow.exceptions import AirflowNotFoundException
 from airflow.models import Connection
 from airflow.secrets.environment_variables import CONN_ENV_PREFIX
 from airflow.utils.db import create_default_connections as 
db_create_default_connections
@@ -207,9 +208,7 @@ def patch_connection(
 
 
 @connections_router.post("/test", 
dependencies=[Depends(requires_access_connection(method="POST"))])
-def test_connection(
-    test_body: ConnectionBody,
-) -> ConnectionTestResponse:
+def test_connection(test_body: ConnectionBody) -> ConnectionTestResponse:
     """
     Test an API connection.
 
@@ -227,9 +226,17 @@ def test_connection(
     transient_conn_id = get_random_string()
     conn_env_var = f"{CONN_ENV_PREFIX}{transient_conn_id.upper()}"
     try:
-        data = test_body.model_dump(by_alias=True)
-        data["conn_id"] = transient_conn_id
-        conn = Connection(**data)
+        # Try to get existing connection and merge with provided values
+        try:
+            existing_conn = 
Connection.get_connection_from_secrets(test_body.connection_id)
+            existing_conn.conn_id = transient_conn_id
+            update_orm_from_pydantic(existing_conn, test_body)
+            conn = existing_conn
+        except AirflowNotFoundException:
+            data = test_body.model_dump(by_alias=True)
+            data["conn_id"] = transient_conn_id
+            conn = Connection(**data)
+
         os.environ[conn_env_var] = conn.get_uri()
         test_status, test_message = conn.test_connection()
         return ConnectionTestResponse.model_validate({"status": test_status, 
"message": test_message})
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index b674b7a6c3a..1f1a51a0102 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -16,11 +16,13 @@
 # under the License.
 from __future__ import annotations
 
+import json
 import os
 from importlib.metadata import PackageNotFoundError, metadata
 from unittest import mock
 
 import pytest
+from sqlalchemy import func, select
 
 from airflow.models import Connection
 from airflow.secrets.environment_variables import CONN_ENV_PREFIX
@@ -920,6 +922,140 @@ class TestConnection(TestConnectionEndpoint):
             "Contact your deployment admin to enable it."
         }
 
+    @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+    def test_should_merge_password_with_existing_connection(self, test_client, 
session):
+        connection = Connection(
+            conn_id=TEST_CONN_ID,
+            conn_type="sqlite",
+            password="existing_password",
+        )
+        session.add(connection)
+        session.commit()
+        initial_count = 
session.scalar(select(func.count()).select_from(Connection))
+
+        captured_value = {}
+
+        def mock_test_connection(self):
+            captured_value["password"] = self.password
+            captured_value["conn_type"] = self.conn_type
+            return True, "mocked"
+
+        body = {
+            "connection_id": TEST_CONN_ID,
+            "conn_type": "new_sqlite",
+            "password": "***",
+        }
+
+        with mock.patch.object(Connection, "test_connection", 
mock_test_connection):
+            response = test_client.post("/connections/test", json=body)
+
+        assert response.status_code == 200
+        assert response.json()["status"] is True
+        # Verify that the existing password was used, not "***"
+        assert captured_value["password"] == "existing_password"
+        # Verify that payload info were used for other fields
+        assert captured_value["conn_type"] == "new_sqlite"
+
+        # Verify DB was not mutated
+        session.expire_all()
+        db_conn = 
session.scalar(select(Connection).filter_by(conn_id=TEST_CONN_ID))
+        assert db_conn.password == "existing_password"
+        assert session.scalar(select(func.count()).select_from(Connection)) == 
initial_count
+
+    @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+    def test_should_merge_extra_with_existing_connection(self, test_client, 
session):
+        connection = Connection(
+            conn_id=TEST_CONN_ID,
+            conn_type="fs",
+            extra='{"path": "/", "existing_key": "existing_value"}',
+        )
+        session.add(connection)
+        session.commit()
+        initial_count = 
session.scalar(select(func.count()).select_from(Connection))
+
+        captured_extra = {}
+
+        def mock_test_connection(self):
+            captured_extra["value"] = self.extra
+            return True, "mocked"
+
+        body = {
+            "connection_id": TEST_CONN_ID,
+            "conn_type": "fs",
+            "extra": '{"path": "/", "new_key": "new_value"}',
+        }
+
+        with mock.patch.object(Connection, "test_connection", 
mock_test_connection):
+            response = test_client.post("/connections/test", json=body)
+
+        assert response.status_code == 200
+        assert response.json()["status"] is True
+        # Verify that new_key is reflected in the merged extra
+        merged_extra = json.loads(captured_extra["value"])
+        assert merged_extra["new_key"] == "new_value"
+        assert merged_extra["path"] == "/"
+
+        # Verify DB was not mutated
+        session.expire_all()
+        db_conn = 
session.scalar(select(Connection).filter_by(conn_id=TEST_CONN_ID))
+        assert json.loads(db_conn.extra) == {"path": "/", "existing_key": 
"existing_value"}
+        assert session.scalar(select(func.count()).select_from(Connection)) == 
initial_count
+
+    @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+    def test_should_merge_both_password_and_extra(self, test_client, session):
+        connection = Connection(
+            conn_id=TEST_CONN_ID,
+            conn_type="fs",
+            password="existing_password",
+            extra='{"path": "/", "existing_key": "existing_value"}',
+        )
+        session.add(connection)
+        session.commit()
+        initial_count = 
session.scalar(select(func.count()).select_from(Connection))
+
+        captured_values = {}
+
+        def mock_test_connection(self):
+            captured_values["password"] = self.password
+            captured_values["extra"] = self.extra
+            return True, "mocked"
+
+        body = {
+            "connection_id": TEST_CONN_ID,
+            "conn_type": "fs",
+            "password": "***",
+            "extra": '{"path": "/", "new_key": "new_value"}',
+        }
+
+        with mock.patch.object(Connection, "test_connection", 
mock_test_connection):
+            response = test_client.post("/connections/test", json=body)
+
+        assert response.status_code == 200
+        assert response.json()["status"] is True
+        # Verify that the existing password was used, not "***"
+        assert captured_values["password"] == "existing_password"
+        # Verify that new_key is reflected in the merged extra
+        merged_extra = json.loads(captured_values["extra"])
+        assert merged_extra["new_key"] == "new_value"
+        assert merged_extra["path"] == "/"
+
+        # Verify DB was not mutated
+        session.expire_all()
+        db_conn = 
session.scalar(select(Connection).filter_by(conn_id=TEST_CONN_ID))
+        assert db_conn.password == "existing_password"
+        assert json.loads(db_conn.extra) == {"path": "/", "existing_key": 
"existing_value"}
+        assert session.scalar(select(func.count()).select_from(Connection)) == 
initial_count
+
+    @mock.patch.dict(os.environ, {"AIRFLOW__CORE__TEST_CONNECTION": "Enabled"})
+    def test_should_test_new_connection_without_existing(self, test_client):
+        body = {
+            "connection_id": "non_existent_conn",
+            "conn_type": "sqlite",
+        }
+        response = test_client.post("/connections/test", json=body)
+        assert response.status_code == 200
+        assert response.json()["status"] is True
+
 
 class TestCreateDefaultConnections(TestConnectionEndpoint):
     def test_should_respond_204(self, test_client, session):

Reply via email to