GitHub user anishgirianish created a discussion: Proposal: Move connection 
testing from API server to workers

# Design Proposal: Asynchronous Connection Testing on Workers

**Author:** @anishgirianish
**PR:** [apache/airflow#60618](https://github.com/apache/airflow/pull/60618) 
(proof of concept)
**Related Issue:** [#58941](https://github.com/apache/airflow/issues/58941)
**Target Version:** Airflow 3.2.0

> **Note:** The linked PR is a working proof of concept to validate the 
> approach. If the community agrees with this design, the implementation will 
> be refined based on feedback.

---

## Motivation

This design follows the direction proposed by @potiuk in [PR 
#59643](https://github.com/apache/airflow/pull/59643), which identified 
fundamental security and isolation problems with running connection tests on 
the API server. The risks were serious enough that Airflow 2.7.0 disabled 
connection testing by default 
([CVE-2023-37379](https://nvd.nist.gov/vuln/detail/CVE-2023-37379)).

In Airflow's current architecture, connection testing (`POST 
/connections/test`) executes synchronously on the API server. This creates two 
problems:

1. **Security risk** -- API servers should not hold or process decrypted 
connection secrets. They are long-lived, internet-facing processes. Running 
`test_connection()` on them expands the attack surface unnecessarily.

2. **Network isolation mismatch** -- Workers typically have network access to 
external systems (databases, cloud APIs, etc.) that API servers do not. A 
connection test that passes on a worker may fail on the API server simply 
because the API server cannot reach the target host, giving operators a 
misleading result.

Airflow 3.x already separates the API server from execution. Connection testing 
should follow this boundary.

---

## Proposed Design

Replace the synchronous test-and-respond flow with an asynchronous queue-poll 
model where the actual connection test runs on a worker.

### End-to-End Flow

```
  UI / Client
      |
      | 1. POST /connections/test  (queue request)
      v
  API Server
      |  - encrypt connection URI (Fernet)
      |  - persist ConnectionTestRequest (state=PENDING)
      |  - return request_id
      v
  UI polls GET /connections/test/{request_id}  (every 1 s, max 60 s)
      .
      .
  Scheduler (heartbeat loop)
      |  - query PENDING requests (skip_locked, limit 10)
      |  - create TestConnection workload + JWT token
      |  - mark request RUNNING
      |  - queue workload to executor
      v
  Worker
      |  - decrypt connection URI
      |  - create transient Connection object
      |  - call connection.test_connection() with timeout
      |  - report result via Execution API:
      |      PATCH /execution/connection-tests/{request_id}/state
      v
  API Server
      |  - validate transition (RUNNING -> SUCCESS | FAILED)
      |  - persist result
      v
  UI receives terminal state, displays result
```

### State Machine

```
  PENDING ──> RUNNING ──> SUCCESS
     |            |
     └──> FAILED <┘
```

- **PENDING**: request created, waiting for scheduler pickup.
- **RUNNING**: dispatched to a worker.
- **SUCCESS / FAILED**: terminal states.

Invalid transitions are rejected with HTTP 409.

---

## Key Design Decisions

### 1. Scheduler as the dispatcher

Connection test requests are picked up in the scheduler's heartbeat loop 
(`_dispatch_connection_tests`). The scheduler already owns the executor 
lifecycle, so it is the natural place to submit workloads. The dispatch call is 
wrapped in a try-except to prevent connection test failures from affecting DAG 
scheduling.

**Alternative considered:** A dedicated micro-service or a separate background 
thread in the API server. Rejected because it would add operational complexity 
and duplicate executor management.

### 2. Fernet-encrypted URI storage

The connection URI is encrypted with the configured Fernet key before being 
persisted in `connection_test_request`. The worker decrypts it at execution 
time. This means the plaintext URI is never stored in the new table, even 
transiently.

### 3. `skip_locked` for concurrent schedulers

`get_pending_requests()` uses `with_for_update(skip_locked=True)` so that in HA 
deployments with multiple schedulers, each scheduler picks up a disjoint set of 
requests without blocking.

### 4. Workload-based execution

Connection testing reuses the existing workload / executor infrastructure. A 
new `TestConnection` workload type is added alongside `ExecuteTask`, 
`ExecuteCallback`, and `RunTrigger`. This means any executor that supports the 
workload protocol (currently LocalExecutor) can run connection tests without 
special-casing.

### 5. Transient connection object on the worker

The worker creates a short-lived `Connection` object with a random `conn_id`, 
exports it as an environment variable (`AIRFLOW_CONN_<id>`), and cleans it up 
in a `finally` block. This avoids leaking secrets into the worker's environment 
beyond the scope of the test.

### 6. UI polling with timeout

The React UI polls every 1 second for up to 60 seconds. If the result hasn't 
arrived by then, the UI reports a timeout. This keeps the UX responsive without 
requiring WebSocket infrastructure.

---

## Database Changes

### New table: `connection_test_request`

| Column | Type | Notes |
|---|---|---|
| `id` | `VARCHAR(36)` PK | UUID7 |
| `state` | `VARCHAR(10)` | pending / running / success / failed |
| `encrypted_connection_uri` | `TEXT` | Fernet-encrypted |
| `conn_type` | `VARCHAR(500)` | e.g. `postgres`, `mysql` |
| `result_status` | `BOOLEAN` (nullable) | true = passed |
| `result_message` | `TEXT` (nullable) | human-readable result |
| `created_at` | `TIMESTAMP (UTC)` | |
| `started_at` | `TIMESTAMP (UTC)` (nullable) | set on RUNNING |
| `completed_at` | `TIMESTAMP (UTC)` (nullable) | set on terminal |
| `timeout` | `INTEGER` | default 60 s |
| `worker_hostname` | `VARCHAR(500)` (nullable) | for debugging |

**Index:** `(state, created_at)` -- supports the scheduler's polling query 
efficiently.

**Migration:** `0099_3_2_0_add_connection_test_request_table` (revision 
`9882c124ea54`).

---

## API Surface

### Core API (client-facing)

| Method | Path | Purpose |
|---|---|---|
| `POST` | `/api/v2/connections/test` | Queue a connection test; returns 
`request_id` |
| `GET` | `/api/v2/connections/test/{request_id}` | Poll for result |

Both endpoints are gated by `requires_access_connection` RBAC.

### Execution API (worker-facing)

| Method | Path | Purpose |
|---|---|---|
| `PATCH` | `/execution/connection-tests/{request_id}/state` | Worker reports 
result |

Authenticated via JWT token embedded in the workload.

---

## Configuration

The feature respects the existing `[core] test_connection` setting. When set to 
`Disabled` (the default), both the API endpoint and the scheduler dispatch are 
no-ops.

```ini
[core]
test_connection = Enabled
```

No new configuration keys are introduced.

---

## Open Questions for Discussion

### 1. Cleanup of stale requests

If a worker crashes mid-test, the request stays in `RUNNING` forever. Options:

- **a)** Scheduler-side reaper: periodically mark `RUNNING` requests older than 
`N` minutes as `FAILED`. Similar to how orphaned task instances are handled.
- **b)** Client-side timeout only: the UI already times out after 60 s. The 
stale row is harmless and can be cleaned up by a periodic table purge.
- **c)** Configurable TTL with a new `[core] connection_test_timeout` setting.

I'm leaning toward **(a)** with a sensible default (e.g. 5 minutes), matching 
the pattern used for zombie task detection. Looking for maintainer input on the 
preferred approach.

### 2. Executor support beyond LocalExecutor

The current implementation works with `LocalExecutor`. `CeleryExecutor` and 
`KubernetesExecutor` would need to handle the `TestConnection` workload type. 
Should this be:

- **a)** Gated so only supported executors attempt dispatch (fail gracefully 
otherwise).
- **b)** Required for all executors as part of the workload protocol.
- **c)** Deferred to follow-up PRs, with `LocalExecutor` as the initial scope.

Currently going with **(c)**, but open to feedback.

### 3. Table retention / purge strategy

`connection_test_request` rows are write-once and never updated after reaching 
a terminal state. Over time the table will grow. Options:

- **a)** Automatic purge of rows older than `N` days, driven by the scheduler.
- **b)** Include in the existing `airflow db clean` command.
- **c)** Leave to operators.

**(b)** seems most consistent with existing patterns.

### 4. Should the scheduler dispatch or should the API server dispatch directly?

The current design uses the scheduler because it owns the executor. An 
alternative is to have the API server submit workloads directly to a message 
broker (e.g. Celery queue), bypassing the scheduler entirely. This would reduce 
latency but couples the API server to broker details. Looking for guidance on 
which trade-off is preferred.

---

## Scope

### In scope (this PR)

- `ConnectionTestRequest` model and migration
- Core API endpoints (queue + poll)
- Execution API endpoint (worker result reporting)
- Scheduler dispatch loop
- `TestConnection` workload type
- `LocalExecutor` support
- UI polling hook
- Unit tests for model, core API, and execution API

### Out of scope (follow-up)

- `CeleryExecutor` / `KubernetesExecutor` support
- Stale request cleanup / reaper
- `airflow db clean` integration
- E2E / integration tests across executor types
- Observability metrics (request latency, success rate)

---

## References

- [PR #59643](https://github.com/apache/airflow/pull/59643) -- @potiuk's review 
where worker-based connection testing was proposed
- [CVE-2023-37379](https://nvd.nist.gov/vuln/detail/CVE-2023-37379) -- RCE via 
connection testing on the API server
- [Airflow AIP-72: Execution 
API](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-72)
- [PR #60618](https://github.com/apache/airflow/pull/60618) -- Implementation PR
- [Issue #58941](https://github.com/apache/airflow/issues/58941) -- dbt-cloud 
test_connection failure

---

cc @potiuk @ashb @pierrejeambrun @ephraimbuddy @kaxil @bbovenzi -- would 
appreciate your input on this design.



GitHub link: https://github.com/apache/airflow/discussions/61147

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to