jpradass opened a new issue #19767:
URL: https://github.com/apache/airflow/issues/19767
### Apache Airflow version
2.0.1
### Operating System
RedHat ubi8/ubi-minimal
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==1.1.0
apache-airflow-providers-celery==1.0.1
apache-airflow-providers-ftp==1.0.1
apache-airflow-providers-http==1.1.0
apache-airflow-providers-imap==1.0.1
apache-airflow-providers-postgres==1.0.1
apache-airflow-providers-redis==1.0.1
apache-airflow-providers-sqlite==1.0.1
### Deployment
Other Docker-based deployment
### Deployment details
I have set a PostgreSQL as database for airflow to keep its data and Redis
as message broker with a custom backend with consumers to handle messages sent
from airflow. So Airflow communicates with this backend when events happens as
when jobs are running or have finished.
### What happened
I've noticed that scheduler aren't sending all events when they have
happened. I have defined some custom functions as callbacks when a Dag is
created, so when a job is finished the scheduler should make an event and put
it to Redis to notify this backend I have built.
Reviewing logs I've seen callbacks are usually invoked in order but if
there's one of them out of place, scheduler skip one notification even if job
has finished successfully. Attached are the scheduler logs.
```
{ "kind": "LOG", "level": "DEBUG", "message": "[2021-11-22 13:20:02,228]
scheduler_job.py:557 - Processing Callback Request: {'full_filepath':
'/home/user/airflow/dags/my.namespace/my.namespace_514.py', 'msg': 'success',
'dag_id': 'my.namespace_514', 'execution_date': datetime.datetime(2021, 11, 22,
13, 15, tzinfo=Timezone('UTC')), 'is_failure_callback': False}" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,302]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:20:02,302] dag.py:868 - Executing dag callback function:
<function on_success_callback_job at 0x7f869fb93670>" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,314]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:20:02,313] plugins_manager.py:264 - Plugins are already loaded.
Skipping." }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,314]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:20:02,314] plugins_manager.py:414 - Integrate DAG plugins" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,327]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:20:02,327] taskinstance.py:1742 - Updating task params
({'namespace': 'my.namespace', 'runtimeName': 'airflow-2-0', 'job':
'60e5f71edd72863533db23fb', 'runtimeType': 'AIRFLOW', 'version': 1}) with
DagRun.conf ({'executedBy': '60e5f71edd72863533db23fb'})" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,327]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:20:02,327] functions.py:13 - on_success_callback_job -
namespace: my.namespace, runtimeJobExecId: 944102, runId:
scheduled__2021-11-22T13:15:00+00:00" }" }
```
The next one to be notified is scheduled__2021-11-22T13:25:00+00:00 but
should have been scheduled__2021-11-22T13:20:00+00:00 so, this last one will be
out of place.
```
{ "kind": "LOG", "level": "DEBUG", "message": "[2021-11-22 13:30:04,522]
scheduler_job.py:557 - Processing Callback Request: {'full_filepath':
'/home/user/airflow/dags/my.namespace/my.namespace_514.py', 'msg': 'success',
'dag_id': 'my.namespace_514', 'execution_date': datetime.datetime(2021, 11, 22,
13, 25, tzinfo=Timezone('UTC')), 'is_failure_callback': False}" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,636]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:30:04,635] dag.py:868 - Executing dag callback function:
<function on_success_callback_job at 0x7fe01d0c0a60>" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,653]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:30:04,653] plugins_manager.py:264 - Plugins are already loaded.
Skipping." }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,654]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:30:04,654] plugins_manager.py:414 - Integrate DAG plugins" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,712]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:30:04,711] taskinstance.py:1742 - Updating task params
({'namespace': 'my.namespace', 'runtimeName': 'airflow-2-0', 'job':
'60e5f71edd72863533db23fb', 'runtimeType': 'AIRFLOW', 'version': 1}) with
DagRun.conf ({'executedBy': '60e5f71edd72863533db23fb'})" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,712]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:30:04,712] functions.py:13 - on_success_callback_job -
namespace: my.namespace, runtimeJobExecId: 944168, runId:
scheduled__2021-11-22T13:25:00+00:00" }" }
```
Now it executes callbacks for scheduled__2021-11-22T13:20:00+00:00 and the
next one that should be scheduled__2021-11-22T13:30:00+00:00 is skipped.
```
{ "kind": "LOG", "level": "DEBUG", "message": "[2021-11-22 13:30:05,485]
scheduler_job.py:557 - Processing Callback Request: {'full_filepath':
'/home/user/airflow/dags/my.namespace/my.namespace_514.py', 'msg': 'success',
'dag_id': 'my.namespace_514', 'execution_date': datetime.datetime(2021, 11, 22,
13, 20, tzinfo=Timezone('UTC')), 'is_failure_callback': False}" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,575]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:30:05,575] dag.py:868 - Executing dag callback function:
<function on_success_callback_job at 0x7f869fa01ee0>" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,583]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:30:05,583] plugins_manager.py:264 - Plugins are already loaded.
Skipping." }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,584]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:30:05,584] plugins_manager.py:414 - Integrate DAG plugins" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,599]
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message":
"[2021-11-22 13:30:05,599] taskinstance.py:1742 - Updating task params
({'namespace': 'my.namespace', 'runtimeName': 'airflow-2-0', 'job':
'60e5f71edd72863533db23fb', 'runtimeType': 'AIRFLOW', 'version': 1}) with
DagRun.conf ({'executedBy': '60e5f71edd72863533db23fb'})" }" }
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,600]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:30:05,600] functions.py:13 - on_success_callback_job -
namespace: my.namespace, runtimeJobExecId: 944137, runId:
scheduled__2021-11-22T13:20:00+00:00" }" }
```
Now should be executed scheduled__2021-11-22T13:30:00+00:00 but, as
mentioned before, is skipped so the next callbacks scheduler invokes is
scheduled__2021-11-22T13:35:00+00:00.
```
{ "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:40:02,419]
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message":
"[2021-11-22 13:40:02,419] functions.py:13 - on_success_callback_job -
namespace: gl.ether.devops, runtimeJobExecId: 944218, runId:
scheduled__2021-11-22T13:35:00+00:00" }" }
```
### What you expected to happen
I expect that even if there's some notification out of place, the scheduler
doesn't skip that one, so my backend can keep the data updated and not
corrupted.
### How to reproduce
You could make a dag with a dummy job scheduled every 5 mins. Sometimes, if
there's some kind of delay invoking callbacks or something, the scheduler could
skip to notify using callbacks defined.
### Anything else
This problem usually happens once a day.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]