imamdigmi opened a new issue #14976:
URL: https://github.com/apache/airflow/issues/14976
**Apache Airflow version**: 2.0.1
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
**Environment**: Docker (with Compose)
- **Cloud provider or hardware configuration**: -
- **OS** (e.g. from /etc/os-release): -
- **Kernel** (e.g. `uname -a`): Darwin Kernel Version 20.2.0: Wed Dec 2
20:39:59 PST 2020; root:xnu-7195.60.75~1/RELEASE_X86_64 x86_64
- **Install tools**: Pip
- **Others**: -
**What happened**:
I'm trying to setup a notification for SLA Miss, but it didn't work, I can
see the list of SLA Miss on the UI and email alert are successfully sent, but
when I add the callback function for SLA miss it didn't executed. And scheduler
logs not producing eny error information.
Here is the scheduler logs:
```
[2021-03-24 09:28:06,905] {scheduler_job.py:941} INFO - 1 tasks up for
execution:
<TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:26:00+00:00
[scheduled]>
[2021-03-24 09:28:06,909] {scheduler_job.py:970} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 128 open slots and 1 task instances
ready to be queued
[2021-03-24 09:28:06,909] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m
has 0/16 running and queued tasks
[2021-03-24 09:28:06,910] {scheduler_job.py:1063} INFO - Setting the
following tasks to queued state:
<TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:26:00+00:00
[scheduled]>
[2021-03-24 09:28:06,914] {scheduler_job.py:1105} INFO - Sending
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='sleep',
execution_date=datetime.datetime(2021, 3, 24, 9, 26, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 2 and queue default
[2021-03-24 09:28:06,915] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep',
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:06,932] {local_executor.py:81} INFO - QueuedLocalWorker
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep',
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:07,016] {scheduler_job.py:941} INFO - 1 tasks up for
execution:
<TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:27:00+00:00
[scheduled]>
[2021-03-24 09:28:07,019] {scheduler_job.py:970} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 127 open slots and 1 task instances
ready to be queued
[2021-03-24 09:28:07,019] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m
has 1/16 running and queued tasks
[2021-03-24 09:28:07,020] {scheduler_job.py:1063} INFO - Setting the
following tasks to queued state:
<TaskInstance: test_sla_miss_1m.sleep 2021-03-24 09:27:00+00:00
[scheduled]>
[2021-03-24 09:28:07,024] {scheduler_job.py:1105} INFO - Sending
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='sleep',
execution_date=datetime.datetime(2021, 3, 24, 9, 27, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 2 and queue default
[2021-03-24 09:28:07,024] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep',
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:07,034] {local_executor.py:81} INFO - QueuedLocalWorker
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'sleep',
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:07,068] {dagbag.py:448} INFO - Filling up the DagBag from
/opt/airflow/dags/test_sla_miss_1m.py
[2021-03-24 09:28:07,171] {dagbag.py:448} INFO - Filling up the DagBag from
/opt/airflow/dags/test_sla_miss_1m.py
Running <TaskInstance: test_sla_miss_1m.sleep 2021-03-24T09:26:00+00:00
[queued]> on host 602b52b8a84d
Running <TaskInstance: test_sla_miss_1m.sleep 2021-03-24T09:27:00+00:00
[queued]> on host 602b52b8a84d
[2021-03-24 09:28:18,007] {scheduler_job.py:941} INFO - 2 tasks up for
execution:
<TaskInstance: test_sla_miss_1m.here 2021-03-24 09:26:00+00:00
[scheduled]>
<TaskInstance: test_sla_miss_1m.here 2021-03-24 09:27:00+00:00
[scheduled]>
[2021-03-24 09:28:18,013] {scheduler_job.py:970} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 128 open slots and 2 task instances
ready to be queued
[2021-03-24 09:28:18,013] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m
has 0/16 running and queued tasks
[2021-03-24 09:28:18,013] {scheduler_job.py:998} INFO - DAG test_sla_miss_1m
has 1/16 running and queued tasks
[2021-03-24 09:28:18,013] {scheduler_job.py:1063} INFO - Setting the
following tasks to queued state:
<TaskInstance: test_sla_miss_1m.here 2021-03-24 09:26:00+00:00
[scheduled]>
<TaskInstance: test_sla_miss_1m.here 2021-03-24 09:27:00+00:00
[scheduled]>
[2021-03-24 09:28:18,017] {scheduler_job.py:1105} INFO - Sending
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='here',
execution_date=datetime.datetime(2021, 3, 24, 9, 26, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 1 and queue default
[2021-03-24 09:28:18,017] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here',
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:18,017] {scheduler_job.py:1105} INFO - Sending
TaskInstanceKey(dag_id='test_sla_miss_1m', task_id='here',
execution_date=datetime.datetime(2021, 3, 24, 9, 27, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 1 and queue default
[2021-03-24 09:28:18,017] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here',
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:18,022] {local_executor.py:81} INFO - QueuedLocalWorker
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here',
'2021-03-24T09:26:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:18,025] {local_executor.py:81} INFO - QueuedLocalWorker
running ['airflow', 'tasks', 'run', 'test_sla_miss_1m', 'here',
'2021-03-24T09:27:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/dags/test_sla_miss_1m.py']
[2021-03-24 09:28:18,026] {scheduler_job.py:1199} INFO - Executor reports
execution of test_sla_miss_1m.sleep execution_date=2021-03-24 09:26:00+00:00
exited with status success for try_number 1
[2021-03-24 09:28:18,026] {scheduler_job.py:1199} INFO - Executor reports
execution of test_sla_miss_1m.sleep execution_date=2021-03-24 09:27:00+00:00
exited with status success for try_number 1
[2021-03-24 09:28:18,125] {dagbag.py:448} INFO - Filling up the DagBag from
/opt/airflow/dags/test_sla_miss_1m.py
[2021-03-24 09:28:18,147] {dagbag.py:448} INFO - Filling up the DagBag from
/opt/airflow/dags/test_sla_miss_1m.py
Running <TaskInstance: test_sla_miss_1m.here 2021-03-24T09:27:00+00:00
[queued]> on host 602b52b8a84d
Running <TaskInstance: test_sla_miss_1m.here 2021-03-24T09:26:00+00:00
[queued]> on host 602b52b8a84d
[2021-03-24 09:28:19,119] {dagrun.py:445} INFO - Marking run <DagRun
test_sla_miss_1m @ 2021-03-24 09:27:00+00:00:
scheduled__2021-03-24T09:27:00+00:00, externally triggered: False> successful
[2021-03-24 09:28:19,126] {dagrun.py:445} INFO - Marking run <DagRun
test_sla_miss_1m @ 2021-03-24 09:26:00+00:00:
scheduled__2021-03-24T09:26:00+00:00, externally triggered: False> successful
[2021-03-24 09:28:19,139] {scheduler_job.py:1199} INFO - Executor reports
execution of test_sla_miss_1m.here execution_date=2021-03-24 09:27:00+00:00
exited with status success for try_number 1
[2021-03-24 09:28:19,139] {scheduler_job.py:1199} INFO - Executor reports
execution of test_sla_miss_1m.here execution_date=2021-03-24 09:26:00+00:00
exited with status success for try_number 1
[2021-03-24 09:30:50,138] {scheduler_job.py:1834} INFO - Resetting orphaned
tasks for active dag runs
[2021-03-24 09:35:50,185] {scheduler_job.py:1834} INFO - Resetting orphaned
tasks for active dag runs
[2021-03-24 09:40:50,264] {scheduler_job.py:1834} INFO - Resetting orphaned
tasks for active dag runs
```
**What you expected to happen**:
A callback function on `sla_miss_callback` params executed
**How to reproduce it**:
1. create a simple DAG with SLA
2. create a callback function to log the information about SLA miss
3. unpause the DAG
here I attach my code:
```python
import logging
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
def ding_on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
logging.info("************************************* SLA missed!
***************************************")
msg = (f"DAGs: {dag}"
f"Tasks: {task_list}"
f"Blocking Tasks: {blocking_task_list}"
f"SLAs: {slas}"
f"Blocking Task Instances: {blocking_tis}")
logging.info(msg)
return msg
ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 3, 23),
'email': ['...'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'sla': timedelta(seconds=5),
'retry_delay': timedelta(seconds=5),
}
DAG_OBJ = DAG(
dag_id='test_sla_miss_1m',
catchup=False,
default_args=ARGS,
schedule_interval='* * * * *',
sla_miss_callback=ding_on_sla_miss,
)
sleep = BashOperator(
task_id='sleep',
dag=DAG_OBJ,
bash_command='echo Start: $(date); sleep 10; echo End: $(date)',
)
here = BashOperator(
task_id='here',
dag=DAG_OBJ,
bash_command='echo "wake up"',
)
sleep >> here
```
**Anything else we need to know**: -
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]