radhwene opened a new issue, #68040:
URL: https://github.com/apache/airflow/issues/68040

   ### Under which category would you file this issue?
   
   Providers
   
   ### Apache Airflow version
   
   2.10.3 or 3.2.0 +
   
   ### What happened and how to reproduce it?
   
   # Cloud SQL — `CloudSQLImportInstanceOperator` / 
`CloudSQLExportInstanceOperator` 409 `operationInProgress` on parallel tasks 
against the same instance
   
   
   ## Problem
   
   Two `CloudSQLImportInstanceOperator` tasks against the same instance, no 
dependency between them → fan out in parallel → one succeeds, the other fails 
in <2s with HTTP 409 / `operationInProgress`. Same race on 
`CloudSQLExportInstanceOperator` and on IMPORT+EXPORT mixed: Cloud SQL 
serializes admin ops per instance. Provider does not retry. No sensor in the 
provider waits for the instance to be idle.
   
   ## Versions
   
   | Component | Version |
   | --- | --- |
   | Apache Airflow | 2.10.3 |
   | apache-airflow-providers-google | 10.22.0 |
   | apache-airflow-providers-postgres | 5.11.1 |
   | Cloud SQL instance | `my-cloudsql-pg` (PostgreSQL 16, `us-central1`) |
   | Executor | LocalExecutor |
   | Python | 3.12.7 |
   
   Also reproduces on Airflow 3.2.0 + providers-google 19.5.0.
   
   ## Reproduction DAG
   
   ```python
   from datetime import datetime
   from pathlib import Path
   
   from airflow import DAG
   from airflow.models.param import Param
   from airflow.providers.google.cloud.operators.cloud_sql import (
       CloudSQLImportInstanceOperator,
   )
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   
   INIT_SQL_PATH = Path(__file__).resolve().parent / "init_tables.sql"
   
   with DAG(
       dag_id="reproduce_409_cloudsql_import",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       params={
           "PROJECT_ID": Param(default="CHANGE_ME", type="string"),
           "INSTANCE":   Param(default="CHANGE_ME", type="string"),
           "GCS_BUCKET": Param(default="CHANGE_ME", type="string"),
           "DB_NAME":    Param(default="airflow_db", type="string"),
       },
       render_template_as_native_obj=True,
   ) as dag:
   
       create_tables = PostgresOperator(
           task_id="create_tables",
           postgres_conn_id="cloudsql_pg",
           sql=INIT_SQL_PATH.read_text(),
       )
   
       import_complaints = CloudSQLImportInstanceOperator(
           task_id="import_complaints",
           project_id="{{ params.PROJECT_ID }}",
           instance="{{ params.INSTANCE }}",
           body={"importContext": {
               "fileType": "CSV",
               "uri": "gs://{{ params.GCS_BUCKET }}/complaints.csv",
               "database": "{{ params.DB_NAME }}",
               "csvImportOptions": {"table": "complaints"},
           }},
           gcp_conn_id="google_cloud_default",
       )
   
       import_crime = CloudSQLImportInstanceOperator(
           task_id="import_crime",
           project_id="{{ params.PROJECT_ID }}",
           instance="{{ params.INSTANCE }}",
           body={"importContext": {
               "fileType": "CSV",
               "uri": "gs://{{ params.GCS_BUCKET }}/crime.csv",
               "database": "{{ params.DB_NAME }}",
               "csvImportOptions": {"table": "crime"},
           }},
           gcp_conn_id="google_cloud_default",
       )
   
       create_tables >> [import_complaints, import_crime]
   ```
   
   Triggered with:
   
   ```
   airflow dags trigger reproduce_409_cloudsql_import \
     --run-id repro_409_20260604_215213 \
     --conf 
'{"PROJECT_ID":"my-project","INSTANCE":"my-cloudsql-pg","GCS_BUCKET":"my-gcs-bucket","DB_NAME":"airflow_db"}'
   ```
   
   ## Result — `import_complaints` failed with HTTP 409 `operationInProgress`
   
   ![Airflow log — import_complaints failed with HttpError 409 
operationInProgress](https://raw.githubusercontent.com/radhwene/airflow-cloudsql-409-artifacts/main/import_complaints_failed_409_ui.png)
   
   ## Result — `import_crime` succeeded (same DAG, same instance, parallel task)
   
   ![Airflow log — import_crime 
SUCCESS](https://raw.githubusercontent.com/radhwene/airflow-cloudsql-409-artifacts/main/import_crime_success_ui.png)
   
   ## `gcloud sql operations list` — only one IMPORT op was created
   
   ```
   gcloud sql operations list \
     --instance=my-cloudsql-pg \
     --project=my-project
   ```
   
   ![gcloud sql operations list — single IMPORT op for the 
run](https://raw.githubusercontent.com/radhwene/airflow-cloudsql-409-artifacts/main/gcloud_ops_list.png)
   
   Single IMPORT op at `2026-06-04T19:52:32` (the successful `import_crime`). 
The failed `import_complaints` produced no entry — the API rejected the request 
before allocating a long-running operation. Documented behavior: [one 
administrative operation per instance at a time][cloudsql-ops].
   
   [cloudsql-ops]: 
https://docs.cloud.google.com/sql/docs/postgres/admin-api#long-running
   
   ## Current workarounds (all unsatisfactory)
   
   | Workaround | Why it falls short |
   | --- | --- |
   | Serialize tasks (`a >> b` instead of `[a, b]`) | Kills parallelism on 
purpose. Doesn't scale to N imports/exports. Doesn't protect against a second 
DAG hitting the same instance. |
   | Airflow `Pool(size=1)` per Cloud SQL instance | Coarse-grained: blocks all 
tasks of any operator type in that pool, not just admin ops. Requires one pool 
per instance, declared out-of-band. Still races against console-triggered ops, 
backups, replica events. |
   | Task-level `retries=N` + `retry_delay` | Blunt: doesn't know if the op 
actually started server-side. On import, a blind retry can either replay rows 
already landed or hit the same 409 indefinitely. Wastes worker slots holding 
tasks queued for retry. |
   | Custom `PythonOperator` pre-check polling `sqladmin.operations.list` | 
Works but duplicates logic that arguably belongs in the provider. Every team 
writes it slightly differently. No async support → blocks a worker slot for the 
whole wait. |
   
   
   
   ## Proposal — deferrable instance-level sensor
   
   ```python
   wait_for_slot = CloudSQLNoOperationInProgressSensor(
       task_id="wait_for_slot",
       project_id="my-project",
       instance="my-cloudsql-pg",
       poke_interval=60,
       timeout=60 * 60 * 2,
       deferrable=True,
   )
   
   wait_for_slot >> CloudSQLImportInstanceOperator(...)
   ```
   
   - Polls `sqladmin.operations.list`, filters on `targetId == instance` and on 
any op in a non-terminal state.
   - Async trigger reuses `CloudSQLHook`. No new auth surface.
   - Fails fast on 404 (instance missing) / 403 (IAM denied).
   - Operation-agnostic → covers IMPORT, EXPORT, and any future 
`sqladmin.instances.*` operator.
   
   
   ## Contribution plan
   
   One PR — sensor + async trigger + unit tests + short docs note. Will open as 
soon as direction is confirmed.
   
   
   ### What you think should happen instead?
   
   The Google provider should ship a deferrable instance-level sensor (e.g. 
`CloudSQLNoOperationInProgressSensor`) that waits until the target Cloud SQL 
instance has no
     non-terminal administrative operation in flight, then releases.
   
     With this sensor wired upstream of `CloudSQLImportInstanceOperator` / 
`CloudSQLExportInstanceOperator`, parallel admin ops against the same instance 
no longer fail with HTTP 409
      `operationInProgress`. The operator's submit semantics stay unchanged; 
the sensor is the explicit "wait for slot" primitive that every team currently 
re-implements with a
     custom `PythonOperator` polling `sqladmin.operations.list`.
   
     Full design + workaround comparison in the issue body.
   
   
   ### Operating System
   
   _No response_
   
   ### Deployment
   
   None
   
   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to