jpradass opened a new issue #19767:
URL: https://github.com/apache/airflow/issues/19767


   ### Apache Airflow version
   
   2.0.1
   
   ### Operating System
   
   RedHat ubi8/ubi-minimal
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==1.1.0
   apache-airflow-providers-celery==1.0.1
   apache-airflow-providers-ftp==1.0.1
   apache-airflow-providers-http==1.1.0
   apache-airflow-providers-imap==1.0.1
   apache-airflow-providers-postgres==1.0.1
   apache-airflow-providers-redis==1.0.1
   apache-airflow-providers-sqlite==1.0.1
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   I have set a PostgreSQL as database for airflow to keep its data and Redis 
as message broker with a custom backend with consumers to handle messages sent 
from airflow. So Airflow communicates with this backend when events happens as 
when jobs are running or have finished. 
   
   ### What happened
   
   I've noticed that scheduler aren't sending all events when they have 
happened. I have defined some custom functions as callbacks when a Dag is 
created, so when a job is finished the scheduler should make an event and put 
it to Redis to notify this backend I have built.
   Reviewing logs I've seen callbacks are usually invoked in order but if 
there's one of them out of place, scheduler skip one notification even if job 
has finished successfully. Attached are the scheduler logs. 
   ```
   { "kind": "LOG", "level": "DEBUG", "message": "[2021-11-22 13:20:02,228] 
scheduler_job.py:557 - Processing Callback Request: {'full_filepath': 
'/home/user/airflow/dags/my.namespace/my.namespace_514.py', 'msg': 'success', 
'dag_id': 'my.namespace_514', 'execution_date': datetime.datetime(2021, 11, 22, 
13, 15, tzinfo=Timezone('UTC')), 'is_failure_callback': False}" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,302] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:20:02,302] dag.py:868 - Executing dag callback function: 
<function on_success_callback_job at 0x7f869fb93670>" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,314] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:20:02,313] plugins_manager.py:264 - Plugins are already loaded. 
Skipping." }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,314] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:20:02,314] plugins_manager.py:414 - Integrate DAG plugins" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,327] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:20:02,327] taskinstance.py:1742 - Updating task params 
({'namespace': 'my.namespace', 'runtimeName': 'airflow-2-0', 'job': 
'60e5f71edd72863533db23fb', 'runtimeType': 'AIRFLOW', 'version': 1}) with 
DagRun.conf ({'executedBy': '60e5f71edd72863533db23fb'})" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:20:02,327] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:20:02,327] functions.py:13 - on_success_callback_job - 
namespace: my.namespace, runtimeJobExecId: 944102, runId: 
scheduled__2021-11-22T13:15:00+00:00" }" }
   ```
   The next one to be notified is scheduled__2021-11-22T13:25:00+00:00 but 
should have been scheduled__2021-11-22T13:20:00+00:00 so, this last one will be 
out of place. 
   ```
   { "kind": "LOG", "level": "DEBUG", "message": "[2021-11-22 13:30:04,522] 
scheduler_job.py:557 - Processing Callback Request: {'full_filepath': 
'/home/user/airflow/dags/my.namespace/my.namespace_514.py', 'msg': 'success', 
'dag_id': 'my.namespace_514', 'execution_date': datetime.datetime(2021, 11, 22, 
13, 25, tzinfo=Timezone('UTC')), 'is_failure_callback': False}" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,636] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:30:04,635] dag.py:868 - Executing dag callback function: 
<function on_success_callback_job at 0x7fe01d0c0a60>" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,653] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:30:04,653] plugins_manager.py:264 - Plugins are already loaded. 
Skipping." }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,654] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:30:04,654] plugins_manager.py:414 - Integrate DAG plugins" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,712] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:30:04,711] taskinstance.py:1742 - Updating task params 
({'namespace': 'my.namespace', 'runtimeName': 'airflow-2-0', 'job': 
'60e5f71edd72863533db23fb', 'runtimeType': 'AIRFLOW', 'version': 1}) with 
DagRun.conf ({'executedBy': '60e5f71edd72863533db23fb'})" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:04,712] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:30:04,712] functions.py:13 - on_success_callback_job - 
namespace: my.namespace, runtimeJobExecId: 944168, runId: 
scheduled__2021-11-22T13:25:00+00:00" }" }
   ```
   Now it executes callbacks for scheduled__2021-11-22T13:20:00+00:00 and the 
next one that should be scheduled__2021-11-22T13:30:00+00:00 is skipped. 
   ```
   { "kind": "LOG", "level": "DEBUG", "message": "[2021-11-22 13:30:05,485] 
scheduler_job.py:557 - Processing Callback Request: {'full_filepath': 
'/home/user/airflow/dags/my.namespace/my.namespace_514.py', 'msg': 'success', 
'dag_id': 'my.namespace_514', 'execution_date': datetime.datetime(2021, 11, 22, 
13, 20, tzinfo=Timezone('UTC')), 'is_failure_callback': False}" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,575] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:30:05,575] dag.py:868 - Executing dag callback function: 
<function on_success_callback_job at 0x7f869fa01ee0>" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,583] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:30:05,583] plugins_manager.py:264 - Plugins are already loaded. 
Skipping." }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,584] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:30:05,584] plugins_manager.py:414 - Integrate DAG plugins" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,599] 
logging_mixin.py:104 - { "kind": "LOG", "level": "DEBUG", "message": 
"[2021-11-22 13:30:05,599] taskinstance.py:1742 - Updating task params 
({'namespace': 'my.namespace', 'runtimeName': 'airflow-2-0', 'job': 
'60e5f71edd72863533db23fb', 'runtimeType': 'AIRFLOW', 'version': 1}) with 
DagRun.conf ({'executedBy': '60e5f71edd72863533db23fb'})" }" }
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:30:05,600] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:30:05,600] functions.py:13 - on_success_callback_job - 
namespace: my.namespace, runtimeJobExecId: 944137, runId: 
scheduled__2021-11-22T13:20:00+00:00" }" }
   ```
   Now should be executed scheduled__2021-11-22T13:30:00+00:00 but, as 
mentioned before, is skipped so the next callbacks scheduler invokes is 
scheduled__2021-11-22T13:35:00+00:00.
   ```
   { "kind": "LOG", "level": "INFO", "message": "[2021-11-22 13:40:02,419] 
logging_mixin.py:104 - { "kind": "LOG", "level": "INFO", "message": 
"[2021-11-22 13:40:02,419] functions.py:13 - on_success_callback_job - 
namespace: gl.ether.devops, runtimeJobExecId: 944218, runId: 
scheduled__2021-11-22T13:35:00+00:00" }" }
   
   ```
   
   
   ### What you expected to happen
   
   I expect that even if there's some notification out of place, the scheduler 
doesn't skip that one, so my backend can keep the data updated and not 
corrupted. 
   
   ### How to reproduce
   
   You could make a dag with a dummy job scheduled every 5 mins. Sometimes, if 
there's some kind of delay invoking callbacks or something, the scheduler could 
skip to notify using callbacks defined. 
   
   ### Anything else
   
   This problem usually happens once a day. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to