suhyeon729 opened a new issue, #67945:
URL: https://github.com/apache/airflow/issues/67945

   ### Under which category would you file this issue?
   
   Providers
   
   ### Apache Airflow version
   
   3.2.2
   
   ### What happened and how to reproduce it?
   
   **Issue Description**
   
   `HttpOperator(deferrable=True)` with non-idempotent HTTP methods (POST, PUT, 
DELETE, PATCH)
   can send duplicate requests when the Triggerer process restarts.
   
   **Root cause:**
   
   `execute_async()` defers immediately without making the HTTP request.
   The actual HTTP call happens inside `HttpTrigger.run()`.
   When the Triggerer restarts, the Trigger is re-instantiated from 
`serialize()`
   and `run()` is called again from scratch — sending the same POST request a 
second time.
   
   ```python
   # operators/http.py:211
   def execute_async(self, context):
       self.defer(
           trigger=HttpTrigger(method="POST", endpoint=..., data=...),
           method_name="execute_complete",
       )
   
   
   # triggers/http.py:160
   async def run(self):
       response = await self._get_response(hook) 
       yield TriggerEvent(...)
   
   
   This is different from the correct deferrable pattern (e.g. AirbyteOperator),
   where the Worker submits the job and gets a job_id first,
   and the Trigger only polls with GET requests (idempotent):
   
   
   # AirbyteOperator.execute() — correct pattern
   job_object = hook.submit_sync_connection(...)  
   self.job_id = job_object.job_id
   self.defer(trigger=AirbyteSyncTrigger(job_id=self.job_id))
   
   Steps to reproduce:
   
   Set up an endpoint that records how many times it receives a POST request
   Create a DAG with the following task:
   
   HttpOperator(
       task_id="test",
       method="POST",
       endpoint="/record",
       http_conn_id="my_conn",
       deferrable=True,
   )
   Run the task and wait until it reaches DEFERRED state
   Kill the Triggerer process and restart it
   Observe the endpoint receives the POST request twice
   
   
   
   ### What you think should happen instead?
   
   The deferrable mode should be safe against Triggerer restarts.
   
   **Short-term fix:** Add a `UserWarning` in `execute_async()` when a 
non-idempotent
   method is used with `deferrable=True`, so users are aware of the risk:
   
   ```python
   def execute_async(self, context):
       if self.method.upper() not in ("GET", "HEAD", "OPTIONS"):
           warnings.warn(
               message=(
                   f"HttpOperator deferrable=True with method={self.method} may 
send duplicate "
                   "requests if the Triggerer restarts."
               ),
               category=UserWarning,
               stacklevel=2,
           )
       self.defer(trigger=HttpTrigger(...), method_name="execute_complete")
   ```
   
   **Long-term fix**: For non-idempotent methods, execute the HTTP request in 
the Worker
   (execute() phase) rather than inside the Trigger, following the same pattern 
as
   AirbyteOperator. The Trigger should only be responsible for polling/waiting,
   not for initiating side-effecting requests.
   
   ### Operating System
   
   _No response_
   
   ### Deployment
   
   None
   
   ### Apache Airflow Provider(s)
   
   http
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-http==6.0.0
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   - The duplicate request issue is silent — no error is raised,  making it 
hard to detect in production.
   - In batch processing contexts, duplicate POST requests can cause  
unintended data duplication or duplicate job executions.
   - Related Trigger design principle (base.py):
     "Trigger classes should assume they will be persisted,
     and then rely on cleanup() being called when they are no longer needed."
   
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to