suhyeon729 opened a new issue, #67945:
URL: https://github.com/apache/airflow/issues/67945
### Under which category would you file this issue?
Providers
### Apache Airflow version
3.2.2
### What happened and how to reproduce it?
**Issue Description**
`HttpOperator(deferrable=True)` with non-idempotent HTTP methods (POST, PUT,
DELETE, PATCH)
can send duplicate requests when the Triggerer process restarts.
**Root cause:**
`execute_async()` defers immediately without making the HTTP request.
The actual HTTP call happens inside `HttpTrigger.run()`.
When the Triggerer restarts, the Trigger is re-instantiated from
`serialize()`
and `run()` is called again from scratch — sending the same POST request a
second time.
```python
# operators/http.py:211
def execute_async(self, context):
self.defer(
trigger=HttpTrigger(method="POST", endpoint=..., data=...),
method_name="execute_complete",
)
# triggers/http.py:160
async def run(self):
response = await self._get_response(hook)
yield TriggerEvent(...)
This is different from the correct deferrable pattern (e.g. AirbyteOperator),
where the Worker submits the job and gets a job_id first,
and the Trigger only polls with GET requests (idempotent):
# AirbyteOperator.execute() — correct pattern
job_object = hook.submit_sync_connection(...)
self.job_id = job_object.job_id
self.defer(trigger=AirbyteSyncTrigger(job_id=self.job_id))
Steps to reproduce:
Set up an endpoint that records how many times it receives a POST request
Create a DAG with the following task:
HttpOperator(
task_id="test",
method="POST",
endpoint="/record",
http_conn_id="my_conn",
deferrable=True,
)
Run the task and wait until it reaches DEFERRED state
Kill the Triggerer process and restart it
Observe the endpoint receives the POST request twice
### What you think should happen instead?
The deferrable mode should be safe against Triggerer restarts.
**Short-term fix:** Add a `UserWarning` in `execute_async()` when a
non-idempotent
method is used with `deferrable=True`, so users are aware of the risk:
```python
def execute_async(self, context):
if self.method.upper() not in ("GET", "HEAD", "OPTIONS"):
warnings.warn(
message=(
f"HttpOperator deferrable=True with method={self.method} may
send duplicate "
"requests if the Triggerer restarts."
),
category=UserWarning,
stacklevel=2,
)
self.defer(trigger=HttpTrigger(...), method_name="execute_complete")
```
**Long-term fix**: For non-idempotent methods, execute the HTTP request in
the Worker
(execute() phase) rather than inside the Trigger, following the same pattern
as
AirbyteOperator. The Trigger should only be responsible for polling/waiting,
not for initiating side-effecting requests.
### Operating System
_No response_
### Deployment
None
### Apache Airflow Provider(s)
http
### Versions of Apache Airflow Providers
apache-airflow-providers-http==6.0.0
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
_No response_
### Docker Image customizations
_No response_
### Anything else?
- The duplicate request issue is silent — no error is raised, making it
hard to detect in production.
- In batch processing contexts, duplicate POST requests can cause
unintended data duplication or duplicate job executions.
- Related Trigger design principle (base.py):
"Trigger classes should assume they will be persisted,
and then rely on cleanup() being called when they are no longer needed."
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]