GitHub user anishgirianish created a discussion: Proposal: Move connection testing from API server to workers
# Design Proposal: Asynchronous Connection Testing on Workers **Author:** @anishgirianish **PR:** [apache/airflow#60618](https://github.com/apache/airflow/pull/60618) (proof of concept) **Related Issue:** [#58941](https://github.com/apache/airflow/issues/58941) **Target Version:** Airflow 3.2.0 > **Note:** The linked PR is a working proof of concept to validate the > approach. If the community agrees with this design, the implementation will > be refined based on feedback. --- ## Motivation This design follows the direction proposed by @potiuk in [PR #59643](https://github.com/apache/airflow/pull/59643), which identified fundamental security and isolation problems with running connection tests on the API server. The risks were serious enough that Airflow 2.7.0 disabled connection testing by default ([CVE-2023-37379](https://nvd.nist.gov/vuln/detail/CVE-2023-37379)). In Airflow's current architecture, connection testing (`POST /connections/test`) executes synchronously on the API server. This creates two problems: 1. **Security risk** -- API servers should not hold or process decrypted connection secrets. They are long-lived, internet-facing processes. Running `test_connection()` on them expands the attack surface unnecessarily. 2. **Network isolation mismatch** -- Workers typically have network access to external systems (databases, cloud APIs, etc.) that API servers do not. A connection test that passes on a worker may fail on the API server simply because the API server cannot reach the target host, giving operators a misleading result. Airflow 3.x already separates the API server from execution. Connection testing should follow this boundary. --- ## Proposed Design Replace the synchronous test-and-respond flow with an asynchronous queue-poll model where the actual connection test runs on a worker. ### End-to-End Flow ``` UI / Client | | 1. POST /connections/test (queue request) v API Server | - encrypt connection URI (Fernet) | - persist ConnectionTestRequest (state=PENDING) | - return request_id v UI polls GET /connections/test/{request_id} (every 1 s, max 60 s) . . Scheduler (heartbeat loop) | - query PENDING requests (skip_locked, limit 10) | - create TestConnection workload + JWT token | - mark request RUNNING | - queue workload to executor v Worker | - decrypt connection URI | - create transient Connection object | - call connection.test_connection() with timeout | - report result via Execution API: | PATCH /execution/connection-tests/{request_id}/state v API Server | - validate transition (RUNNING -> SUCCESS | FAILED) | - persist result v UI receives terminal state, displays result ``` ### State Machine ``` PENDING ──> RUNNING ──> SUCCESS | | └──> FAILED <┘ ``` - **PENDING**: request created, waiting for scheduler pickup. - **RUNNING**: dispatched to a worker. - **SUCCESS / FAILED**: terminal states. Invalid transitions are rejected with HTTP 409. --- ## Key Design Decisions ### 1. Scheduler as the dispatcher Connection test requests are picked up in the scheduler's heartbeat loop (`_dispatch_connection_tests`). The scheduler already owns the executor lifecycle, so it is the natural place to submit workloads. The dispatch call is wrapped in a try-except to prevent connection test failures from affecting DAG scheduling. **Alternative considered:** A dedicated micro-service or a separate background thread in the API server. Rejected because it would add operational complexity and duplicate executor management. ### 2. Fernet-encrypted URI storage The connection URI is encrypted with the configured Fernet key before being persisted in `connection_test_request`. The worker decrypts it at execution time. This means the plaintext URI is never stored in the new table, even transiently. ### 3. `skip_locked` for concurrent schedulers `get_pending_requests()` uses `with_for_update(skip_locked=True)` so that in HA deployments with multiple schedulers, each scheduler picks up a disjoint set of requests without blocking. ### 4. Workload-based execution Connection testing reuses the existing workload / executor infrastructure. A new `TestConnection` workload type is added alongside `ExecuteTask`, `ExecuteCallback`, and `RunTrigger`. This means any executor that supports the workload protocol (currently LocalExecutor) can run connection tests without special-casing. ### 5. Transient connection object on the worker The worker creates a short-lived `Connection` object with a random `conn_id`, exports it as an environment variable (`AIRFLOW_CONN_<id>`), and cleans it up in a `finally` block. This avoids leaking secrets into the worker's environment beyond the scope of the test. ### 6. UI polling with timeout The React UI polls every 1 second for up to 60 seconds. If the result hasn't arrived by then, the UI reports a timeout. This keeps the UX responsive without requiring WebSocket infrastructure. --- ## Database Changes ### New table: `connection_test_request` | Column | Type | Notes | |---|---|---| | `id` | `VARCHAR(36)` PK | UUID7 | | `state` | `VARCHAR(10)` | pending / running / success / failed | | `encrypted_connection_uri` | `TEXT` | Fernet-encrypted | | `conn_type` | `VARCHAR(500)` | e.g. `postgres`, `mysql` | | `result_status` | `BOOLEAN` (nullable) | true = passed | | `result_message` | `TEXT` (nullable) | human-readable result | | `created_at` | `TIMESTAMP (UTC)` | | | `started_at` | `TIMESTAMP (UTC)` (nullable) | set on RUNNING | | `completed_at` | `TIMESTAMP (UTC)` (nullable) | set on terminal | | `timeout` | `INTEGER` | default 60 s | | `worker_hostname` | `VARCHAR(500)` (nullable) | for debugging | **Index:** `(state, created_at)` -- supports the scheduler's polling query efficiently. **Migration:** `0099_3_2_0_add_connection_test_request_table` (revision `9882c124ea54`). --- ## API Surface ### Core API (client-facing) | Method | Path | Purpose | |---|---|---| | `POST` | `/api/v2/connections/test` | Queue a connection test; returns `request_id` | | `GET` | `/api/v2/connections/test/{request_id}` | Poll for result | Both endpoints are gated by `requires_access_connection` RBAC. ### Execution API (worker-facing) | Method | Path | Purpose | |---|---|---| | `PATCH` | `/execution/connection-tests/{request_id}/state` | Worker reports result | Authenticated via JWT token embedded in the workload. --- ## Configuration The feature respects the existing `[core] test_connection` setting. When set to `Disabled` (the default), both the API endpoint and the scheduler dispatch are no-ops. ```ini [core] test_connection = Enabled ``` No new configuration keys are introduced. --- ## Open Questions for Discussion ### 1. Cleanup of stale requests If a worker crashes mid-test, the request stays in `RUNNING` forever. Options: - **a)** Scheduler-side reaper: periodically mark `RUNNING` requests older than `N` minutes as `FAILED`. Similar to how orphaned task instances are handled. - **b)** Client-side timeout only: the UI already times out after 60 s. The stale row is harmless and can be cleaned up by a periodic table purge. - **c)** Configurable TTL with a new `[core] connection_test_timeout` setting. I'm leaning toward **(a)** with a sensible default (e.g. 5 minutes), matching the pattern used for zombie task detection. Looking for maintainer input on the preferred approach. ### 2. Executor support beyond LocalExecutor The current implementation works with `LocalExecutor`. `CeleryExecutor` and `KubernetesExecutor` would need to handle the `TestConnection` workload type. Should this be: - **a)** Gated so only supported executors attempt dispatch (fail gracefully otherwise). - **b)** Required for all executors as part of the workload protocol. - **c)** Deferred to follow-up PRs, with `LocalExecutor` as the initial scope. Currently going with **(c)**, but open to feedback. ### 3. Table retention / purge strategy `connection_test_request` rows are write-once and never updated after reaching a terminal state. Over time the table will grow. Options: - **a)** Automatic purge of rows older than `N` days, driven by the scheduler. - **b)** Include in the existing `airflow db clean` command. - **c)** Leave to operators. **(b)** seems most consistent with existing patterns. ### 4. Should the scheduler dispatch or should the API server dispatch directly? The current design uses the scheduler because it owns the executor. An alternative is to have the API server submit workloads directly to a message broker (e.g. Celery queue), bypassing the scheduler entirely. This would reduce latency but couples the API server to broker details. Looking for guidance on which trade-off is preferred. --- ## Scope ### In scope (this PR) - `ConnectionTestRequest` model and migration - Core API endpoints (queue + poll) - Execution API endpoint (worker result reporting) - Scheduler dispatch loop - `TestConnection` workload type - `LocalExecutor` support - UI polling hook - Unit tests for model, core API, and execution API ### Out of scope (follow-up) - `CeleryExecutor` / `KubernetesExecutor` support - Stale request cleanup / reaper - `airflow db clean` integration - E2E / integration tests across executor types - Observability metrics (request latency, success rate) --- ## References - [PR #59643](https://github.com/apache/airflow/pull/59643) -- @potiuk's review where worker-based connection testing was proposed - [CVE-2023-37379](https://nvd.nist.gov/vuln/detail/CVE-2023-37379) -- RCE via connection testing on the API server - [Airflow AIP-72: Execution API](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-72) - [PR #60618](https://github.com/apache/airflow/pull/60618) -- Implementation PR - [Issue #58941](https://github.com/apache/airflow/issues/58941) -- dbt-cloud test_connection failure --- cc @potiuk @ashb @pierrejeambrun @ephraimbuddy @kaxil @bbovenzi -- would appreciate your input on this design. GitHub link: https://github.com/apache/airflow/discussions/61147 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
