radhwene commented on PR #68361:
URL: https://github.com/apache/airflow/pull/68361#issuecomment-4674860196

   ### E2E evidence: real Cloud SQL PostgreSQL instance, Airflow v2
   
   Adding the reproduction and validation runs behind the “Why hook-level 
retry, not only a sensor”
   and “E2E validation” sections.
   
   **Sensor-only approach does not close the race.**
   
   I validated the standalone `CloudSQLNoOperationInProgressSensor` approach by 
placing a
   `CloudSQLNoOperationInProgressSensor` in `reschedule` mode before each 
import task.
   
   In that run, the sensor task `wait_a` reports **success**, but the 
immediately downstream
   `CloudSQLImportInstanceOperator` task `import_complaints` still fails with
   `409 operationInProgress`.
   
   That demonstrates the remaining TOCTOU window between:
   
   1. the sensor observing the instance as idle;
   2. the downstream operator being scheduled;
   3. the operator submitting the actual Cloud SQL Admin API request.
   
   ![Sensor passes, import still fails with 
409](https://raw.githubusercontent.com/radhwene/airflow-cloudsql-409-artifacts/main/sensoror_solution.png)
   
   **Hook-level retry handles the contention at the submit point.**
   
   I also validated this PR with a harder topology: four Cloud SQL admin 
operations
   submitted in parallel against the same instance — two imports and two 
exports — with no
   sensor.
   
   All four tasks succeed after retrying the operation submit, and no `409 
operationInProgress`
   is surfaced to the DAG run.
   
   ![No sensor, four parallel admin ops all 
succeed](https://raw.githubusercontent.com/radhwene/airflow-cloudsql-409-artifacts/main/new_version_of_operator.png)
   
   This validates both patched hook methods, `import_instance` and 
`export_instance`, under
   real Cloud SQL operation serialization.
   
   <details>
   <summary>E2E DAG used for the validation 
(<code>cloudsql_retry_stress_409</code>)</summary>
   
   ```python
   """
   cloudsql_retry_stress.py
   
   E2E stress test for the apache/airflow#68040 retry-on-409 fix.
   
   This DAG fans out four Cloud SQL admin operations in parallel against the 
same
   Cloud SQL PostgreSQL instance:
   
       create_tables ──┬──> import_complaints
                       ├──> import_crime
                       ├──> export_a
                       └──> export_b
   
   Cloud SQL serializes admin operations per instance, so simultaneous submits
   can collide with HTTP 409 ``operationInProgress``.
   
   Expected behavior:
   
     - stock provider:
         parallel operations can fail with 409 and the DAG run fails;
   
     - patched provider:
         import/export submit calls retry through 
``operation_in_progress_retry``;
         all four tasks complete successfully.
   
   Export object URIs are unique per run with ``{{ ts_nodash }}`` because Cloud 
SQL
   export fails if the destination GCS object already exists.
   
   Trigger config: PROJECT_ID, INSTANCE, GCS_BUCKET, DB_NAME.
   """
   
   from __future__ import annotations
   
   from datetime import datetime
   from pathlib import Path
   
   from airflow import DAG
   from airflow.models.param import Param
   from airflow.providers.google.cloud.operators.cloud_sql import (
       CloudSQLExportInstanceOperator,
       CloudSQLImportInstanceOperator,
   )
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   
   INIT_SQL_PATH = Path(__file__).resolve().parent / "init_tables.sql"
   
   
   def _import(task_id: str, table: str) -> CloudSQLImportInstanceOperator:
       return CloudSQLImportInstanceOperator(
           task_id=task_id,
           project_id="{{ params.PROJECT_ID }}",
           instance="{{ params.INSTANCE }}",
           body={
               "importContext": {
                   "fileType": "CSV",
                   "uri": f"gs://{{{{ params.GCS_BUCKET }}}}/{table}.csv",
                   "database": "{{ params.DB_NAME }}",
                   "csvImportOptions": {"table": table},
               }
           },
           gcp_conn_id="google_cloud_default",
       )
   
   
   def _export(task_id: str, label: str) -> CloudSQLExportInstanceOperator:
       return CloudSQLExportInstanceOperator(
           task_id=task_id,
           project_id="{{ params.PROJECT_ID }}",
           instance="{{ params.INSTANCE }}",
           body={
               "exportContext": {
                   "fileType": "CSV",
                   "uri": f"gs://{{{{ params.GCS_BUCKET 
}}}}/stress_export_{label}_{{{{ ts_nodash }}}}.csv",
                   "databases": ["{{ params.DB_NAME }}"],
                   "csvExportOptions": {"selectQuery": "SELECT 1 AS col"},
               }
           },
           gcp_conn_id="google_cloud_default",
       )
   
   
   with DAG(
       dag_id="cloudsql_retry_stress_409",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       tags=["cloudsql", "fix-409", "retry", "stress"],
       params={
           "PROJECT_ID": Param(default="CHANGE_ME", type="string"),
           "INSTANCE": Param(default="CHANGE_ME", type="string"),
           "GCS_BUCKET": Param(default="CHANGE_ME", type="string"),
           "DB_NAME": Param(default="airflow_db", type="string"),
       },
       render_template_as_native_obj=True,
   ) as dag:
   
       create_tables = PostgresOperator(
           task_id="create_tables",
           postgres_conn_id="cloudsql_pg",
           sql=INIT_SQL_PATH.read_text(),
       )
   
       create_tables >> [
           _import("import_complaints", "complaints"),
           _import("import_crime", "crime"),
           _export("export_a", "a"),
           _export("export_b", "b"),
       ]
   ```
   
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to