radhwene opened a new issue, #68040:
URL: https://github.com/apache/airflow/issues/68040
### Under which category would you file this issue?
Providers
### Apache Airflow version
2.10.3 or 3.2.0 +
### What happened and how to reproduce it?
# Cloud SQL — `CloudSQLImportInstanceOperator` /
`CloudSQLExportInstanceOperator` 409 `operationInProgress` on parallel tasks
against the same instance
## Problem
Two `CloudSQLImportInstanceOperator` tasks against the same instance, no
dependency between them → fan out in parallel → one succeeds, the other fails
in <2s with HTTP 409 / `operationInProgress`. Same race on
`CloudSQLExportInstanceOperator` and on IMPORT+EXPORT mixed: Cloud SQL
serializes admin ops per instance. Provider does not retry. No sensor in the
provider waits for the instance to be idle.
## Versions
| Component | Version |
| --- | --- |
| Apache Airflow | 2.10.3 |
| apache-airflow-providers-google | 10.22.0 |
| apache-airflow-providers-postgres | 5.11.1 |
| Cloud SQL instance | `my-cloudsql-pg` (PostgreSQL 16, `us-central1`) |
| Executor | LocalExecutor |
| Python | 3.12.7 |
Also reproduces on Airflow 3.2.0 + providers-google 19.5.0.
## Reproduction DAG
```python
from datetime import datetime
from pathlib import Path
from airflow import DAG
from airflow.models.param import Param
from airflow.providers.google.cloud.operators.cloud_sql import (
CloudSQLImportInstanceOperator,
)
from airflow.providers.postgres.operators.postgres import PostgresOperator
INIT_SQL_PATH = Path(__file__).resolve().parent / "init_tables.sql"
with DAG(
dag_id="reproduce_409_cloudsql_import",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
params={
"PROJECT_ID": Param(default="CHANGE_ME", type="string"),
"INSTANCE": Param(default="CHANGE_ME", type="string"),
"GCS_BUCKET": Param(default="CHANGE_ME", type="string"),
"DB_NAME": Param(default="airflow_db", type="string"),
},
render_template_as_native_obj=True,
) as dag:
create_tables = PostgresOperator(
task_id="create_tables",
postgres_conn_id="cloudsql_pg",
sql=INIT_SQL_PATH.read_text(),
)
import_complaints = CloudSQLImportInstanceOperator(
task_id="import_complaints",
project_id="{{ params.PROJECT_ID }}",
instance="{{ params.INSTANCE }}",
body={"importContext": {
"fileType": "CSV",
"uri": "gs://{{ params.GCS_BUCKET }}/complaints.csv",
"database": "{{ params.DB_NAME }}",
"csvImportOptions": {"table": "complaints"},
}},
gcp_conn_id="google_cloud_default",
)
import_crime = CloudSQLImportInstanceOperator(
task_id="import_crime",
project_id="{{ params.PROJECT_ID }}",
instance="{{ params.INSTANCE }}",
body={"importContext": {
"fileType": "CSV",
"uri": "gs://{{ params.GCS_BUCKET }}/crime.csv",
"database": "{{ params.DB_NAME }}",
"csvImportOptions": {"table": "crime"},
}},
gcp_conn_id="google_cloud_default",
)
create_tables >> [import_complaints, import_crime]
```
Triggered with:
```
airflow dags trigger reproduce_409_cloudsql_import \
--run-id repro_409_20260604_215213 \
--conf
'{"PROJECT_ID":"my-project","INSTANCE":"my-cloudsql-pg","GCS_BUCKET":"my-gcs-bucket","DB_NAME":"airflow_db"}'
```
## Result — `import_complaints` failed with HTTP 409 `operationInProgress`

## Result — `import_crime` succeeded (same DAG, same instance, parallel task)

## `gcloud sql operations list` — only one IMPORT op was created
```
gcloud sql operations list \
--instance=my-cloudsql-pg \
--project=my-project
```

Single IMPORT op at `2026-06-04T19:52:32` (the successful `import_crime`).
The failed `import_complaints` produced no entry — the API rejected the request
before allocating a long-running operation. Documented behavior: [one
administrative operation per instance at a time][cloudsql-ops].
[cloudsql-ops]:
https://docs.cloud.google.com/sql/docs/postgres/admin-api#long-running
## Current workarounds (all unsatisfactory)
| Workaround | Why it falls short |
| --- | --- |
| Serialize tasks (`a >> b` instead of `[a, b]`) | Kills parallelism on
purpose. Doesn't scale to N imports/exports. Doesn't protect against a second
DAG hitting the same instance. |
| Airflow `Pool(size=1)` per Cloud SQL instance | Coarse-grained: blocks all
tasks of any operator type in that pool, not just admin ops. Requires one pool
per instance, declared out-of-band. Still races against console-triggered ops,
backups, replica events. |
| Task-level `retries=N` + `retry_delay` | Blunt: doesn't know if the op
actually started server-side. On import, a blind retry can either replay rows
already landed or hit the same 409 indefinitely. Wastes worker slots holding
tasks queued for retry. |
| Custom `PythonOperator` pre-check polling `sqladmin.operations.list` |
Works but duplicates logic that arguably belongs in the provider. Every team
writes it slightly differently. No async support → blocks a worker slot for the
whole wait. |
## Proposal — deferrable instance-level sensor
```python
wait_for_slot = CloudSQLNoOperationInProgressSensor(
task_id="wait_for_slot",
project_id="my-project",
instance="my-cloudsql-pg",
poke_interval=60,
timeout=60 * 60 * 2,
deferrable=True,
)
wait_for_slot >> CloudSQLImportInstanceOperator(...)
```
- Polls `sqladmin.operations.list`, filters on `targetId == instance` and on
any op in a non-terminal state.
- Async trigger reuses `CloudSQLHook`. No new auth surface.
- Fails fast on 404 (instance missing) / 403 (IAM denied).
- Operation-agnostic → covers IMPORT, EXPORT, and any future
`sqladmin.instances.*` operator.
## Contribution plan
One PR — sensor + async trigger + unit tests + short docs note. Will open as
soon as direction is confirmed.
### What you think should happen instead?
The Google provider should ship a deferrable instance-level sensor (e.g.
`CloudSQLNoOperationInProgressSensor`) that waits until the target Cloud SQL
instance has no
non-terminal administrative operation in flight, then releases.
With this sensor wired upstream of `CloudSQLImportInstanceOperator` /
`CloudSQLExportInstanceOperator`, parallel admin ops against the same instance
no longer fail with HTTP 409
`operationInProgress`. The operator's submit semantics stay unchanged;
the sensor is the explicit "wait for slot" primitive that every team currently
re-implements with a
custom `PythonOperator` polling `sqladmin.operations.list`.
Full design + workaround comparison in the issue body.
### Operating System
_No response_
### Deployment
None
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
_No response_
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
_No response_
### Docker Image customizations
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]