madison-ookla opened a new issue #11086:
URL: https://github.com/apache/airflow/issues/11086


   **Apache Airflow version**: 1.10.7, 1.10.10, 1.10.12
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: AWS EC2
   - **OS** (e.g. from /etc/os-release): Linux
   - **Kernel** (e.g. `uname -a`): Debian
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   For the last several versions of Airflow, we've noticed that when a task 
receives a `SIGTERM` signal (currently represented as `Task exited with return 
code Negsignal.SIGKILL`, though previously represented as `Task exited with 
return code -9`), the failure email would be sent, but the 
`on_failure_callback` would not be called.
   
   This happened fairly frequently in the past for us as we had tasks that 
would consume high amounts of memory and occasionally we would have too many 
running on the same worker and the tasks would be OOM killed. In these 
instances, we would receive failure emails with the contents `detected as 
zombie` and the `on_failure_callback` would **not** be called. We were hoping 
#7025 would resolve this with the most recent upgrade (and we've also taken 
steps to reduce our memory footprint), but we just had this happen again 
recently. 
   
   **What you expected to happen**:
   
   If a tasks fails (even if the cause of the failure is a lack of resources), 
I would hope the `on_failure_callback` would still be called.
   
   **How to reproduce it**:
   
   Example DAG setup:
   
   <details><summary>CODE</summary>
   
   ```python
   # -*- coding: utf-8 -*-
   
   """
   # POC: On Failure Callback for SIGKILL
   """
   
   from datetime import datetime
   
   import numpy as np
   
   from airflow import DAG
   from airflow.api.common.experimental.trigger_dag import trigger_dag
   from airflow.operators.python_operator import PythonOperator
   
   
   def on_failure_callback(**context):
       print("===IN ON FAILURE CALLBACK===")
       print("Triggering another run of the task")
       trigger_dag("OOM_test_follower")
   
   
   def high_memory_task():
       l = []
       iteration = 0
       while True:
           print(f"Iteration: {iteration}")
           l.append(np.random.randint(1_000_000, size=(1000, 1000, 100)))
           iteration += 1
   
   
   def failure_task():
       raise ValueError("whoops")
   
   
   def print_context(**context):
       print("This DAG was launched by the failure callback")
       print(context)
   
   
   dag = DAG(
       dag_id="OOM_test",
       schedule_interval=None,
       catchup=False,
       default_args={
           "owner": "madison.bowden",
           "start_date": datetime(year=2019, month=7, day=1),
           "email": "your-email",
       },
   )
   
   with dag:
   
       PythonOperator(
           task_id="oom_task",
           python_callable=high_memory_task,
           on_failure_callback=on_failure_callback,
       )
   
   failure_dag = DAG(
       dag_id="Failure_test",
       schedule_interval=None,
       catchup=False,
       default_args={
           "owner": "madison.bowden",
           "start_date": datetime(year=2019, month=7, day=1),
           "email": "your-email",
       },
   )
   
   with failure_dag:
   
       PythonOperator(
           task_id="failure_task",
           python_callable=failure_task,
           on_failure_callback=on_failure_callback,
       )
   
   dag_follower = DAG(
       dag_id="OOM_test_follower",
       schedule_interval=None,
       catchup=False,
       default_args={
           "owner": "madison.bowden",
           "start_date": datetime(year=2019, month=7, day=1),
           "email": "your-email",
       },
   )
   
   with dag_follower:
   
       PythonOperator(
           task_id="oom_task_failure", python_callable=print_context, 
provide_context=True
       )
   
   ```
   
   </details>
   
   With the above example, the `Failure_test` should trigger a run of the 
`OOM_test_follower` DAG when it fails. The `OOM_test` DAG when triggered should 
quickly run out of memory and then **not** trigger a run of the 
`OOM_test_follower` DAG.
   
   
   **Anything else we need to know**:
   
   <!--
   
   How often does this problem occur? Once? Every time etc?
   
   Any relevant logs to include? Put them here in side a detail tag:
   <details><summary>x.log</summary> lots of stuff </details>
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to