GitHub user anishgirianish edited a discussion: Proposal: Move connection 
testing from API server to workers

# Design Proposal: Asynchronous Connection Testing on Workers

**Author:** @anishgirianish
**PR:** [apache/airflow#60618](https://github.com/apache/airflow/pull/60618) 
(proof of concept)

> **Note:** The linked PR is a working proof of concept to validate the 
> approach. If the community agrees with this design, the implementation will 
> be refined based on feedback.

---

## Motivation

This design follows the direction proposed by @potiuk in [PR 
#59643](https://github.com/apache/airflow/pull/59643), which identified 
fundamental security and isolation problems with running connection tests on 
the API server. The risks were serious enough that Airflow 2.7.0 disabled 
connection testing by default 
([CVE-2023-37379](https://nvd.nist.gov/vuln/detail/CVE-2023-37379)).

In Airflow's current architecture, connection testing (`POST 
/connections/test`) executes synchronously on the API server. This creates two 
problems:

1. **Security risk** -- API servers should not hold or process decrypted 
connection secrets. They are long-lived, internet-facing processes. Running 
`test_connection()` on them expands the attack surface unnecessarily.

2. **Network isolation mismatch** -- Workers typically have network access to 
external systems (databases, cloud APIs, etc.) that API servers do not. A 
connection test that passes on a worker may fail on the API server simply 
because the API server cannot reach the target host, giving operators a 
misleading result.

Airflow 3.x already separates the API server from execution. Connection testing 
should follow this boundary.

---

## Proposed Design

Replace the synchronous test-and-respond flow with an asynchronous queue-poll 
model where the actual connection test runs on a worker.

### End-to-End Flow

```
  UI / Client
      |
      | 1. POST /connections/test  (queue request)
      v
  API Server
      |  - encrypt connection URI (Fernet)
      |  - persist ConnectionTestRequest (state=PENDING)
      |  - return request_id
      v
  UI polls GET /connections/test/{request_id}  (every 1 s, max 60 s)
      .
      .
  Scheduler (heartbeat loop)
      |  - query PENDING requests (skip_locked, limit 10)
      |  - create TestConnection workload + JWT token
      |  - mark request RUNNING
      |  - queue workload to executor
      v
  Worker
      |  - decrypt connection URI
      |  - create transient Connection object
      |  - call connection.test_connection() with timeout
      |  - report result via Execution API:
      |      PATCH /execution/connection-tests/{request_id}/state
      v
  API Server
      |  - validate transition (RUNNING -> SUCCESS | FAILED)
      |  - persist result
      v
  UI receives terminal state, displays result
```

### State Machine

```
  PENDING ──> RUNNING ──> SUCCESS
     |            |
     └──> FAILED <┘
```

- **PENDING**: request created, waiting for scheduler pickup.
- **RUNNING**: dispatched to a worker.
- **SUCCESS / FAILED**: terminal states.

Invalid transitions are rejected with HTTP 409.

---

## Key Design Decisions

### 1. Scheduler as the dispatcher

Connection test requests are picked up in the scheduler's heartbeat loop. The 
scheduler already owns the executor lifecycle, so it is the natural place to 
submit workloads. The dispatch call is wrapped in a try-except to prevent 
connection test failures from affecting DAG scheduling.

### 2. Fernet-encrypted URI storage

The connection URI is encrypted before being persisted in the new 
`connection_test_request` table. The worker decrypts it at execution time. The 
plaintext URI is never stored in the new table.

### 3. Workload-based execution

A new `TestConnection` workload type is added alongside `ExecuteTask`, 
`ExecuteCallback`, and `RunTrigger`. This reuses the existing workload/executor 
infrastructure so any executor that supports the workload protocol can run 
connection tests.

---

## API Surface

### Core API (client-facing)

| Method | Path | Purpose |
|---|---|---|
| `POST` | `/api/v2/connections/test` | Queue a connection test; returns 
`request_id` |
| `GET` | `/api/v2/connections/test/{request_id}` | Poll for result |

Both endpoints are gated by `requires_access_connection` RBAC.

### Execution API (worker-facing)

| Method | Path | Purpose |
|---|---|---|
| `PATCH` | `/execution/connection-tests/{request_id}/state` | Worker reports 
result |

Authenticated via JWT token embedded in the workload.

---

## Database Changes

A new `connection_test_request` table tracks the lifecycle of each test 
request. Key columns: `id` (UUID7), `state`, `encrypted_connection_uri`, 
`conn_type`, `result_status`, `result_message`, and timestamps (`created_at`, 
`started_at`, `completed_at`). Indexed on `(state, created_at)` for efficient 
scheduler polling.

---

## Configuration

The feature respects the existing `[core] test_connection` setting. When set to 
`Disabled` (the default), both the API endpoint and the scheduler dispatch are 
no-ops. No new configuration keys are introduced.

---

## Open Questions

### 1. Cleanup / retention of old requests

If a worker crashes mid-test, the request stays in `RUNNING` forever. Beyond 
that, completed rows accumulate over time. Options:

- **a)** Scheduler-side reaper that marks stale `RUNNING` requests as `FAILED` 
after N minutes (similar to zombie task detection), plus inclusion in `airflow 
db clean` for completed rows.
- **b)** Client-side timeout only (UI already times out at 60 s); stale rows 
are harmless and cleaned up by operators.

Leaning toward **(a)**. Looking for maintainer input.

### 2. Executor support beyond LocalExecutor

The POC works with `LocalExecutor`. `CeleryExecutor` and `KubernetesExecutor` 
would need to handle the `TestConnection` workload type. Plan is to defer other 
executors to follow-up PRs unless maintainers prefer otherwise.

---

## Scope

### In scope (this PR)

- `ConnectionTestRequest` model and migration
- Core API endpoints (queue + poll)
- Execution API endpoint (worker result reporting)
- Scheduler dispatch loop
- `TestConnection` workload type
- `LocalExecutor` support
- UI polling hook
- Unit tests

### Out of scope (follow-up)

- `CeleryExecutor` / `KubernetesExecutor` support
- Stale request cleanup / reaper
- `airflow db clean` integration
- E2E / integration tests

---

## References

- [PR #59643](https://github.com/apache/airflow/pull/59643) -- @potiuk's review 
where worker-based connection testing was proposed
- [CVE-2023-37379](https://nvd.nist.gov/vuln/detail/CVE-2023-37379) -- RCE via 
connection testing on the API server
- [Airflow AIP-72: Execution 
API](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-72)
- [PR #60618](https://github.com/apache/airflow/pull/60618) -- Implementation 
PR (POC)

---

cc @potiuk @ashb @pierrejeambrun @ephraimbuddy @kaxil @bbovenzi -- would 
appreciate your input on this design.



GitHub link: https://github.com/apache/airflow/discussions/61147

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to