Usiel opened a new issue, #40018:
URL: https://github.com/apache/airflow/issues/40018

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   8.3.0 (latest main)
   
   ### Apache Airflow version
   
   2.9.1
   
   ### Operating System
   
   -
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   We recently encountered a case where our Triggerer deployments were under 
heavy load and the trigger execution was severely delayed. This caused the 
following behavior to emerge:
   
   1. A deferrable KubernetesPodOperator is started and deferred
   2. `startup_timeout_seconds` (or more) seconds pass
   3. A Triggerer finally gets to `run()` the trigger and emits a `timeout` 
event, even if the pod is running or already completed successfully
   4. Task fails due to the timeout
   
   The same behavior can be provoked by another likely case: Assume the 
Triggerer running the trigger crashes or restarts after the 
`startup_timeout_seconds` has been reached and the pod has left pending state 
already. The new Triggerer process will execute `run()` and emit a timeout 
event because it sees the timeout has been reached (it doesn't check for the 
pod state).
   
   ### What you think should happen instead
   
   The pod state should be checked, only if it is still pending **and** the 
timeout has been reached should a timeout event be emitted.
   
   ### How to reproduce
   
   Use the following DAG on an Airflow deployment with Triggerer:
   
   ```
   from datetime import datetime
   
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from airflow.providers.cncf.kubernetes.utils.pod_manager import 
OnFinishAction
   
   with DAG(
           dag_id="kubernetes_trigger_bug",
           schedule=None,
           start_date=datetime(2021, 1, 1),
   ) as dag:
       k = KubernetesPodOperator(
           task_id="kubernetes_task_async",
           namespace="default",
           image="alpine:latest",
           cmds=["sh", "-c"],
           arguments=["sleep 60"],
           name="airflow-test-pod",
           deferrable=True,
           startup_timeout_seconds=10,
           on_finish_action=OnFinishAction.DELETE_SUCCEEDED_POD,
       )
   ```
   
   Now, you can simulate a Triggerer crash while the DAG is running:
   
   1. Trigger the DAG
   2. Wait 10 seconds (but less than 60 seconds)
   3. Kill the Triggerer process that handles the trigger (the new process will 
emit a timeout event)
   4. Wait for the pod to complete
   5. Task will fail due to the timeout
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to