MatthewRBruce commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-832716285


   Okay, so we dug into this and here's what we found (TL;DR we think we're 
getting bit by mysql's default isolation level of REPEATABLE_READ):
   
   ```
   from airflow import settings
   from airflow.models import dagrun
   from airflow.models.dag import DagModel
   from airflow.models.dagrun import DagRun
   from airflow.models.taskinstance import TaskInstance
   from airflow.utils.types import DagRunType
   from airflow.utils.state import State
   from sqlalchemy import and_, func, not_, or_, tuple_
   from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, 
with_row_locks
   
   session = settings.Session()
   cls = dagrun.DagRun
   ```
   Here we execute a query to get the recent DagRuns for 
`airflow-utils.send-airflow-heartbeat_every_minute` and we see we get runs up 
to `13:37:00`
   ```
   dag_run_query = 
session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
   for dag_run in dag_run_query:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:32:00+00:00: scheduled__2021-05-05T13:32:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:31:00+00:00: scheduled__2021-05-05T13:31:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:30:00+00:00: scheduled__2021-05-05T13:30:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:29:00+00:00: scheduled__2021-05-05T13:29:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:28:00+00:00: scheduled__2021-05-05T13:28:00+00:00, externally triggered: 
False>
   ```
   
   Next we wait ~5 mins, and we run this query again but with `FOR UPDATE` and 
we get the new rows (This is how the schdeuler will determine what dag runs to 
schedule here: 
https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1477)
   ```
   dag_query_run_update = 
session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
   for dag_run in dag_query_run_update:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:42:00+00:00: scheduled__2021-05-05T13:42:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:41:00+00:00: scheduled__2021-05-05T13:41:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:40:00+00:00: scheduled__2021-05-05T13:40:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:39:00+00:00: scheduled__2021-05-05T13:39:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:38:00+00:00: scheduled__2021-05-05T13:38:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: 
False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 
13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: 
False>
   ```
   
   So, there are some new DagRuns, great.  When the scheduler goes to get the 
related task instances 
(https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L307-L310)
 it will execute the query, but in the original snapshot:
   ```
   ti_query = 
session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
   for ti in ti_query:
   ...   print (ti)
   ...
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:37:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:36:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:35:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:34:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:33:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:32:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:31:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:30:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:29:00+00:00 [success]>
   <TaskInstance: 
airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 
13:28:00+00:00 [success]>
   ```
   As we can see in ^  This query can only see the TIs up until 13:37:00, so it 
finds 0 tasks for the recent runs, which means this 
https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L444 
will mark the DAG run as successful
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to