imamdigmi opened a new issue #14976:
URL: https://github.com/apache/airflow/issues/14976


   **Apache Airflow version**: 2.0.1
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   **Environment**: Docker (with Compose)
   - **Cloud provider or hardware configuration**: -
   - **OS** (e.g. from /etc/os-release): -
   - **Kernel** (e.g. `uname -a`): Darwin Kernel Version 20.2.0: Wed Dec  2 
20:39:59 PST 2020; root:xnu-7195.60.75~1/RELEASE_X86_64 x86_64
   - **Install tools**: Pip
   - **Others**: -
   
   **What happened**:
   I'm trying to setup a notification for SLA Miss, but it didn't work, I can 
see the list of SLA Miss on the UI and email alert are successfully sent, but 
when I add the callback function for SLA miss it didn't executed. And scheduler 
logs not producing eny error information.
   
   Here is the scheduler logs:
   ```
   [2021-03-24 09:28:06,905] {scheduler_job.py:941} INFO - 1 tasks up for 
execution:
        <TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:26:00+00:00 
[scheduled]>
   [2021-03-24 09:28:06,909] {scheduler_job.py:970} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 128 open slots and 1 task instances 
ready to be queued
   [2021-03-24 09:28:06,909] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m 
has 0/16 running and queued tasks
   [2021-03-24 09:28:06,910] {scheduler_job.py:1063} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:26:00+00:00 
[scheduled]>
   [2021-03-24 09:28:06,914] {scheduler_job.py:1105} INFO - Sending 
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='sleep', 
execution_date=datetime.datetime(2021, 3, 24, 9, 26, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 2 and queue default
   [2021-03-24 09:28:06,915] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep', 
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:06,932] {local_executor.py:81} INFO - QueuedLocalWorker 
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep', 
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:07,016] {scheduler_job.py:941} INFO - 1 tasks up for 
execution:
        <TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:27:00+00:00 
[scheduled]>
   [2021-03-24 09:28:07,019] {scheduler_job.py:970} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 127 open slots and 1 task instances 
ready to be queued
   [2021-03-24 09:28:07,019] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m 
has 1/16 running and queued tasks
   [2021-03-24 09:28:07,020] {scheduler_job.py:1063} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:27:00+00:00 
[scheduled]>
   [2021-03-24 09:28:07,024] {scheduler_job.py:1105} INFO - Sending 
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='sleep', 
execution_date=datetime.datetime(2021, 3, 24, 9, 27, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 2 and queue default
   [2021-03-24 09:28:07,024] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep', 
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:07,034] {local_executor.py:81} INFO - QueuedLocalWorker 
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep', 
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:07,068] {dagbag.py:448} INFO - Filling up the DagBag from 
/opt/airflow/dags/test_sla_miss_1m.py
   [2021-03-24 09:28:07,171] {dagbag.py:448} INFO - Filling up the DagBag from 
/opt/airflow/dags/test_sla_miss_1m.py
   Running <TaskInstance: test_sla_miss_1m.sleep 2021-03-24T09:26:00+00:00 
[queued]> on host 602b52b8a84d
   Running <TaskInstance: test_sla_miss_1m.sleep 2021-03-24T09:27:00+00:00 
[queued]> on host 602b52b8a84d
   [2021-03-24 09:28:18,007] {scheduler_job.py:941} INFO - 2 tasks up for 
execution:
        <TaskInstance: test_sla_miss_1m.here 2021-03-24 09:26:00+00:00 
[scheduled]>
        <TaskInstance: test_sla_miss_1m.here 2021-03-24 09:27:00+00:00 
[scheduled]>
   [2021-03-24 09:28:18,013] {scheduler_job.py:970} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 128 open slots and 2 task instances 
ready to be queued
   [2021-03-24 09:28:18,013] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m 
has 0/16 running and queued tasks
   [2021-03-24 09:28:18,013] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m 
has 1/16 running and queued tasks
   [2021-03-24 09:28:18,013] {scheduler_job.py:1063} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: test_sla_miss_1m.here 2021-03-24 09:26:00+00:00 
[scheduled]>
        <TaskInstance: test_sla_miss_1m.here 2021-03-24 09:27:00+00:00 
[scheduled]>
   [2021-03-24 09:28:18,017] {scheduler_job.py:1105} INFO - Sending 
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='here', 
execution_date=datetime.datetime(2021, 3, 24, 9, 26, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 1 and queue default
   [2021-03-24 09:28:18,017] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here', 
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:18,017] {scheduler_job.py:1105} INFO - Sending 
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='here', 
execution_date=datetime.datetime(2021, 3, 24, 9, 27, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 1 and queue default
   [2021-03-24 09:28:18,017] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here', 
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:18,022] {local_executor.py:81} INFO - QueuedLocalWorker 
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here', 
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:18,025] {local_executor.py:81} INFO - QueuedLocalWorker 
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here', 
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/dags/test_sla_miss_1m.py']
   [2021-03-24 09:28:18,026] {scheduler_job.py:1199} INFO - Executor reports 
execution of test_sla_miss_1m.sleep execution_date=2021-03-24 09:26:00+00:00 
exited with status success for try_number 1
   [2021-03-24 09:28:18,026] {scheduler_job.py:1199} INFO - Executor reports 
execution of test_sla_miss_1m.sleep execution_date=2021-03-24 09:27:00+00:00 
exited with status success for try_number 1
   [2021-03-24 09:28:18,125] {dagbag.py:448} INFO - Filling up the DagBag from 
/opt/airflow/dags/test_sla_miss_1m.py
   [2021-03-24 09:28:18,147] {dagbag.py:448} INFO - Filling up the DagBag from 
/opt/airflow/dags/test_sla_miss_1m.py
   Running <TaskInstance: test_sla_miss_1m.here 2021-03-24T09:27:00+00:00 
[queued]> on host 602b52b8a84d
   Running <TaskInstance: test_sla_miss_1m.here 2021-03-24T09:26:00+00:00 
[queued]> on host 602b52b8a84d
   [2021-03-24 09:28:19,119] {dagrun.py:445} INFO - Marking run <DagRun 
test_sla_miss_1m @ 2021-03-24 09:27:00+00:00: 
scheduled__2021-03-24T09:27:00+00:00, externally triggered: False> successful
   [2021-03-24 09:28:19,126] {dagrun.py:445} INFO - Marking run <DagRun 
test_sla_miss_1m @ 2021-03-24 09:26:00+00:00: 
scheduled__2021-03-24T09:26:00+00:00, externally triggered: False> successful
   [2021-03-24 09:28:19,139] {scheduler_job.py:1199} INFO - Executor reports 
execution of test_sla_miss_1m.here execution_date=2021-03-24 09:27:00+00:00 
exited with status success for try_number 1
   [2021-03-24 09:28:19,139] {scheduler_job.py:1199} INFO - Executor reports 
execution of test_sla_miss_1m.here execution_date=2021-03-24 09:26:00+00:00 
exited with status success for try_number 1
   [2021-03-24 09:30:50,138] {scheduler_job.py:1834} INFO - Resetting orphaned 
tasks for active dag runs
   [2021-03-24 09:35:50,185] {scheduler_job.py:1834} INFO - Resetting orphaned 
tasks for active dag runs
   [2021-03-24 09:40:50,264] {scheduler_job.py:1834} INFO - Resetting orphaned 
tasks for active dag runs
   ```
   
   **What you expected to happen**:
   A callback function on `sla_miss_callback` params executed
   
   **How to reproduce it**:
   1. create a simple DAG with SLA
   2. create a callback function to log the information about SLA miss
   3. unpause the DAG
   
   here I attach my code:
   ```python
   import logging
   
   from airflow.models import DAG
   from datetime import datetime, timedelta
   from airflow.operators.bash import BashOperator
   
   
   def ding_on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
       logging.info("************************************* SLA missed! 
***************************************")
       msg = (f"DAGs: {dag}"
              f"Tasks: {task_list}"
              f"Blocking Tasks: {blocking_task_list}"
              f"SLAs: {slas}"
              f"Blocking Task Instances: {blocking_tis}")
       logging.info(msg)
       return msg
   
   
   ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': datetime(2021, 3, 23),
       'email': ['...'],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 0,
       'sla': timedelta(seconds=5),
       'retry_delay': timedelta(seconds=5),
   }
   
   DAG_OBJ = DAG(
       dag_id='test_sla_miss_1m',
       catchup=False,
       default_args=ARGS,
       schedule_interval='* * * * *',
       sla_miss_callback=ding_on_sla_miss,
   )
   
   sleep = BashOperator(
       task_id='sleep',
       dag=DAG_OBJ,
       bash_command='echo Start: $(date); sleep 10; echo End: $(date)',
   )
   
   here = BashOperator(
       task_id='here',
       dag=DAG_OBJ,
       bash_command='echo "wake up"',
   )
   
   sleep >> here
   ```
   
   
   **Anything else we need to know**: -


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to