MatthewRBruce commented on issue #15559: URL: https://github.com/apache/airflow/issues/15559#issuecomment-832716285
Okay, so we dug into this and here's what we found (TL;DR we think we're getting bit by mysql's default isolation level of REPEATABLE_READ): ``` from airflow import settings from airflow.models import dagrun from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.utils.types import DagRunType from airflow.utils.state import State from sqlalchemy import and_, func, not_, or_, tuple_ from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks session = settings.Session() cls = dagrun.DagRun ``` Here we execute a query to get the recent DagRuns for `airflow-utils.send-airflow-heartbeat_every_minute` and we see we get runs up to `13:37:00` ``` dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10) for dag_run in dag_run_query: ... print(dag_run) ... <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:32:00+00:00: scheduled__2021-05-05T13:32:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:31:00+00:00: scheduled__2021-05-05T13:31:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:30:00+00:00: scheduled__2021-05-05T13:30:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:29:00+00:00: scheduled__2021-05-05T13:29:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:28:00+00:00: scheduled__2021-05-05T13:28:00+00:00, externally triggered: False> ``` Next we wait ~5 mins, and we run this query again but with `FOR UPDATE` and we get the new rows (This is how the schdeuler will determine what dag runs to schedule here: https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1477) ``` dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update() for dag_run in dag_query_run_update: ... print(dag_run) ... <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:42:00+00:00: scheduled__2021-05-05T13:42:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:41:00+00:00: scheduled__2021-05-05T13:41:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:40:00+00:00: scheduled__2021-05-05T13:40:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:39:00+00:00: scheduled__2021-05-05T13:39:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:38:00+00:00: scheduled__2021-05-05T13:38:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False> <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False> ``` So, there are some new DagRuns, great. When the scheduler goes to get the related task instances (https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L307-L310) it will execute the query, but in the original snapshot: ``` ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10) for ti in ti_query: ... print (ti) ... <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:37:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:36:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:35:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:34:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:33:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:32:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:31:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:30:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:29:00+00:00 [success]> <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:28:00+00:00 [success]> ``` As we can see in ^ This query can only see the TIs up until 13:37:00, so it finds 0 tasks for the recent runs, which means this https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L444 will mark the DAG run as successful -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
