kaxil commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r3292563762


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py:
##########
@@ -259,6 +283,94 @@ def test_connection(test_body: ConnectionBody) -> 
ConnectionTestResponse:
         os.environ.pop(conn_env_var, None)
 
 
+@connections_router.post(
+    "/enqueue-test",
+    status_code=status.HTTP_202_ACCEPTED,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_403_FORBIDDEN,
+            status.HTTP_409_CONFLICT,
+            status.HTTP_422_UNPROCESSABLE_ENTITY,
+        ]
+    ),
+    dependencies=[Depends(requires_access_connection(method="POST")), 
Depends(action_logging())],
+)
+def enqueue_connection_test(
+    test_body: ConnectionTestRequestBody,
+    session: SessionDep,
+) -> ConnectionTestQueuedResponse:
+    """Enqueue a connection test for deferred execution on a worker; returns a 
polling token."""
+    _ensure_test_connection_enabled()
+    _ensure_executor_is_configured(test_body.executor)
+
+    connection_test = ConnectionTestRequest(
+        connection_id=test_body.connection_id,
+        conn_type=test_body.conn_type,
+        host=test_body.host,
+        login=test_body.login,
+        password=test_body.password,
+        schema=test_body.schema_,
+        port=test_body.port,
+        extra=test_body.extra,
+        commit_on_success=test_body.commit_on_success,
+        executor=test_body.executor,
+        queue=test_body.queue,
+    )
+    session.add(connection_test)
+    try:
+        session.flush()
+    except IntegrityError:
+        raise HTTPException(
+            status.HTTP_409_CONFLICT,
+            f"An active connection test already exists for connection_id 
`{test_body.connection_id}`.",
+        )
+
+    return ConnectionTestQueuedResponse(
+        token=connection_test.token,
+        connection_id=connection_test.connection_id,
+        state=connection_test.state,
+    )
+
+
+@connections_router.get(
+    "/enqueue-test/{connection_test_token}",

Review Comment:
   **Bearer token in URL path leaks via access logs.**
   
   Putting the worker callback token in the URL path means it ends up in API 
server access logs, reverse-proxy logs, browser history, and any intermediate 
observability that captures request URIs. A token sitting in a URL is treated 
as low-sensitivity by every layer between the worker and the API, but it grants 
the same authority as a JWT in an `Authorization` header.
   
   Two options:
   
   1. Move the token to an `Authorization: Bearer ...` header (or a POST body 
field) and resolve the request by `request_id` alone.
   2. Keep the URL stable but store only a SHA-256 hash of the token, then 
accept the plaintext via header and compare hashes server-side.
   
   Option 2 is friendlier if the worker has to retry without re-issuing tokens, 
but option 1 is simpler.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3245,6 +3261,94 @@ def _cleanup_orphaned_asset_state(*, session: Session) 
-> None:
         )
         
session.execute(delete(AssetStateModel).where(AssetStateModel.asset_id.not_in(active_asset_ids)))
 
+    def _enqueue_connection_tests(self, *, session: Session) -> None:
+        """
+        Enqueue pending connection tests to executors that support them.
+
+        ``max_concurrency`` is per-scheduler, not global: with N HA schedulers
+        the worst-case per-tick dispatch is ``N * max_concurrency``. Connection
+        tests are user-initiated and rare, so the overshoot self-corrects via
+        the reaper. For a true global cap, wrap the budget+claim below in a
+        sentinel-row ``SELECT ... FOR UPDATE``.
+        """
+        max_concurrency = conf.getint("connection_test", "max_concurrency", 
fallback=4)
+        timeout = conf.getint("connection_test", "timeout", fallback=60)
+
+        active_count = session.scalar(
+            select(func.count(ConnectionTestRequest.id)).where(
+                ConnectionTestRequest.state.in_(DISPATCHED_STATES)
+            )
+        )
+        budget = max_concurrency - (active_count or 0)
+        if budget <= 0:
+            return
+
+        pending_stmt = (
+            select(ConnectionTestRequest)
+            .where(ConnectionTestRequest.state == ConnectionTestState.PENDING)
+            .order_by(ConnectionTestRequest.created_at)
+            .limit(budget)
+        )
+        pending_stmt = with_row_locks(pending_stmt, session, 
of=ConnectionTestRequest, skip_locked=True)
+        pending_tests = session.scalars(pending_stmt).all()
+
+        if not pending_tests:
+            return
+
+        for ct in pending_tests:
+            executor = self._try_to_load_executor(ct, session)

Review Comment:
   **Multi-team executor routing is broken at dispatch.**
   
   `_get_workload_team_name` calls `ct.get_dag_id()`, which returns `None` for 
`ConnectionTestRequest` because there is no DAG behind a connection test. The 
`team_name` resolution then collapses to `None`, and every connection-test 
workload routes through the global executor regardless of which team the 
connection belongs to. The team-scoped executor pool the rest of the PR sets up 
is bypassed for this workload type.
   
   Resolve the team directly from the row:
   
   ```python
   team_name = Connection.get_team_name(ct.connection_id, session)
   executor = self._try_to_load_executor(executor_name, team_name=team_name)
   ```
   
   Ideally a regression test that spins up two team-scoped executors and 
asserts a `ConnectionTest` workload for team B lands on team B executor, not 
the global one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to