kaxil commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2963468979


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py:
##########
@@ -174,6 +189,70 @@ def bulk_connections(
     return BulkConnectionService(session=session, 
request=request).handle_request()
 
 
+@connections_router.patch(
+    "/{connection_id}/test",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+    dependencies=[Depends(requires_access_connection(method="PUT")), 
Depends(action_logging())],
+)
+def patch_connection_and_test(
+    connection_id: str,
+    patch_body: ConnectionBody,
+    session: SessionDep,
+    update_mask: list[str] | None = Query(None),
+    executor: str | None = Query(None, description="Executor to route the 
connection test to"),
+    queue: str | None = Query(None, description="Queue to route the connection 
test to"),
+) -> ConnectionSaveAndTestResponse:
+    """
+    Update a connection and queue an async test with revert-on-failure.
+
+    Atomically saves the edit and creates a ConnectionTest with snapshots of 
the
+    pre-edit and post-edit state. If the test fails, the connection is 
automatically
+    reverted to its pre-edit values.
+    """
+    _ensure_test_connection_enabled()
+
+    if patch_body.connection_id != connection_id:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            "The connection_id in the request body does not match the URL 
parameter",
+        )
+
+    connection = 
session.scalar(select(Connection).filter_by(conn_id=connection_id).limit(1))
+    if connection is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"The Connection with connection_id: `{connection_id}` was not 
found",
+        )
+
+    try:
+        ConnectionBody(**patch_body.model_dump())

Review Comment:
   `patch_body` is already a `ConnectionBody` instance — FastAPI validated it 
on parameter injection. Re-constructing 
`ConnectionBody(**patch_body.model_dump())` just validates the same data again. 
This block is dead code.



##########
airflow-core/src/airflow/migrations/versions/0110_3_2_0_add_connection_test_table.py:
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add connection_test table for async connection testing.
+
+Revision ID: a7e6d4c3b2f1
+Revises: 1d6611b6ab7c
+Create Date: 2026-02-22 00:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# revision identifiers, used by Alembic.
+revision = "a7e6d4c3b2f1"
+down_revision = "1d6611b6ab7c"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"
+
+
+def upgrade():
+    """Create connection_test table."""
+    op.create_table(
+        "connection_test",
+        sa.Column("id", sa.Uuid(), nullable=False),
+        sa.Column("token", sa.String(64), nullable=False),
+        sa.Column("connection_id", sa.String(250), nullable=False),
+        sa.Column("state", sa.String(10), nullable=False),
+        sa.Column("result_message", sa.Text(), nullable=True),
+        sa.Column("created_at", UtcDateTime(timezone=True), nullable=False),
+        sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False),

Review Comment:
   Longest current value is `"pending"` (7 chars). `String(10)` leaves almost 
no room — `"timed_out"` (9) or `"cancelled"` (9) would need a new migration. 
Use `String(20)` for breathing room.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -168,6 +177,84 @@ def _execute_callback(log: Logger, workload: 
workloads.ExecuteCallback, team_con
         raise RuntimeError(error_msg or "Callback execution failed")
 
 
+def _execute_connection_test(log: Logger, workload: workloads.TestConnection, 
team_conf) -> None:
+    """
+    Execute a connection test workload.
+
+    Constructs an SDK ``Client``, fetches the connection via the Execution API,
+    enforces a timeout via ``signal.alarm``, and reports all outcomes back
+    through the Execution API.
+
+    :param log: Logger instance
+    :param workload: The TestConnection workload to execute
+    :param team_conf: Team-specific executor configuration
+    """
+    # Lazy import: SDK modules must not be loaded at module level to avoid
+    # coupling core (scheduler-loaded) code to the SDK.
+    from airflow.sdk.api.client import Client
+    from airflow.sdk.execution_time.comms import ErrorResponse
+
+    setproctitle(
+        f"{_get_executor_process_title_prefix(team_conf.team_name)} 
connection-test {workload.connection_id}",
+        log,
+    )
+
+    base_url = team_conf.get("api", "base_url", fallback="/")
+    if base_url.startswith("/"):
+        base_url = f"http://localhost:8080{base_url}";
+    default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
+    server = team_conf.get("core", "execution_api_server_url", 
fallback=default_execution_api_server)
+
+    client = Client(base_url=server, token=workload.token)
+
+    def _handle_timeout(signum, frame):
+        raise TimeoutError(f"Connection test timed out after 
{workload.timeout}s")
+
+    signal.signal(signal.SIGALRM, _handle_timeout)
+    signal.alarm(workload.timeout)
+    try:
+        client.connection_tests.update_state(workload.connection_test_id, 
ConnectionTestState.RUNNING)
+
+        conn_response = client.connections.get(workload.connection_id)
+        if isinstance(conn_response, ErrorResponse):
+            raise RuntimeError(f"Connection '{workload.connection_id}' not 
found via Execution API")
+
+        conn = Connection(
+            conn_id=conn_response.conn_id,
+            conn_type=conn_response.conn_type,
+            host=conn_response.host,
+            login=conn_response.login,
+            password=conn_response.password,
+            schema=conn_response.schema_,
+            port=conn_response.port,
+            extra=conn_response.extra,
+        )
+        success, message = run_connection_test(conn=conn)
+
+        state = ConnectionTestState.SUCCESS if success else 
ConnectionTestState.FAILED
+        client.connection_tests.update_state(workload.connection_test_id, 
state, message)
+    except TimeoutError:
+        log.error(
+            "Connection test timed out after %ds",
+            workload.timeout,
+            connection_id=workload.connection_id,
+        )
+        client.connection_tests.update_state(
+            workload.connection_test_id,
+            ConnectionTestState.FAILED,
+            f"Connection test timed out after {workload.timeout}s",
+        )
+    except Exception as e:
+        log.exception("Connection test failed unexpectedly", 
connection_id=workload.connection_id)
+        client.connection_tests.update_state(
+            workload.connection_test_id,
+            ConnectionTestState.FAILED,

Review Comment:
   `f"Connection test failed unexpectedly: {e}"[:500]` stores raw exception 
text. Some DB drivers include connection strings (with credentials) in error 
messages. This gets stored in `result_message` and exposed via the polling API.
   
   Consider sanitizing or using a generic message with the exception type only:
   ```python
   f"Connection test failed unexpectedly: {type(e).__name__}"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to