doiken opened a new issue, #31407:
URL: https://github.com/apache/airflow/issues/31407
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
The scheduler rarely triggers a DagRun to be executed in the future even if
allow_trigger_in_future=False.
Here are the conditions as I understand them.
- max_active_runs is set and upper limit is reached
- The preceding DagRun completes very slightly earlier than the following
DagRun
Details in "Anything else".
### What you think should happen instead
DagRun should wait until scheduled
### How to reproduce
I have confirmed reproduction in Airflow 2.2.2 with the following code.
I reproduced it in my environment after running it for about half a day.
``` python
import copy
import logging
import time
from datetime import datetime, timedelta
import pendulum
from airflow import DAG, AirflowException
from airflow.sensors.python import PythonSensor
from airflow.utils import timezone
logger = logging.getLogger(__name__)
# very small min_file_process_interval may help to reproduce more. e.g.
min_file_process_interval=3
def create_dag(interval):
with DAG(
dag_id=f"example_reproduce_{interval:0>2}",
schedule_interval=f"*/{interval} * * * *",
start_date=datetime(2021, 1, 1),
catchup=False,
max_active_runs=2,
tags=["example_race_condition"],
) as dag:
target_s = 10
def raise_if_future(context):
now = timezone.utcnow() - timedelta(seconds=30)
if context["data_interval_start"] > now:
raise AirflowException("DagRun supposed to be triggered in
the future triggered")
def wait_sync():
now_dt = pendulum.now()
if now_dt.minute % (interval * 2) == 0:
# wait until target time to synchronize end time with the
preceding job
target_dt = copy.copy(now_dt).replace(second=target_s + 2)
wait_sec = (target_dt - now_dt).total_seconds()
logger.info(f"sleep {now_dt} -> {target_dt} in {wait_sec}
seconds")
if wait_sec > 0:
time.sleep(wait_sec)
return True
PythonSensor(
task_id="t2",
python_callable=wait_sync,
# To avoid getting stuck in SequentialExecutor, try to re-poke
after the next job starts
poke_interval=interval * 60 + target_s,
mode="reschedule",
pre_execute=raise_if_future,
)
return dag
for i in [1, 2]:
globals()[i] = create_dag(i)
```
### Operating System
Amazon Linux 2
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
MWAA 2.2.2
### Anything else
The assumed flow and the associated actual query logs for the case
max_active_runs=2 are shown below.
**The assumed flow**
1. The first DagRun (DR1) starts
1. The subsequent DagRun (DR2) starts
1. DR2 completes; The scheduler set `next_dagrun_create_after=null` if
max_active_runs is exceeded
-
https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L931
1. DR1 completes; The scheduler calls
dag_model.calculate_dagrun_date_fields() in
SchedulerJobRunner._schedule_dag_run(). The session is NOT committed yet
- note: the result of `calculate_dagrun_date_fields` is the old
DR1-based value from `dag.get_run_data_interval(DR"2")`.
-
https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L1017
1. DagFileProcessorProcess modifies next_dagrun_create_after
- note: the dag record fetched in step 4 are not locked, so the
`Processor` can select it and update it.
-
https://github.com/apache/airflow/blob/2.2.2/airflow/dag_processing/processor.py#L646
1. The scheduler reflects the calculation result of DR1 to DB by
`guard.commit()`
- note: Only the `next_dagrun_create_after` column set to null in step 2
is updated because sqlalchemy only updates the difference between the record
retrieved in step 4 and the calculation result
-
https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L795
1. The scheduler triggers a future DagRun because the current time satisfies
next_dagrun_create_after updated in step 6
**The associated query log**
``` sql
bb55c5b0bdce: /# grep "future_dagrun_00"
/var/lib/postgresql/data/log/postgresql-2023-03-08_210056.log | grep
"next_dagrun"
2023-03-08 22: 00: 01.678 UTC[57378] LOG: statement: UPDATE dag SET
next_dagrun_create_after = NULL WHERE dag.dag_id = 'future_dagrun_0' # set in
step 3
2023-03-08 22: 00: 08.162 UTC[57472] LOG: statement: UPDATE dag SET
last_parsed_time = '2023-03-08T22:00:07.683786+00:00':: timestamptz,
next_dagrun = '2023-03-08T22:00:00+00:00':: timestamptz,
next_dagrun_data_interval_start = '2023-03-08T22:00:00+00:00':: timestamptz,
next_dagrun_data_interval_end = '2023-03-08T23:00:00+00:00':: timestamptz,
next_dagrun_create_after = '2023-03-08T23:00:00+00:00'::timestamptz WHERE
dag.dag_id = 'future_dagrun_00' # set in step 5
2023-03-08 22: 00: 09.137 UTC[57475] LOG: statement: UPDATE dag SET
next_dagrun_create_after = '2023-03-08T22:00:00+00:00'::timestamptz WHERE
dag.dag_id = 'future_dagrun_00' # set in step 6
2023-03-08 22: 00: 10.418 UTC[57479] LOG: statement: UPDATE dag SET
next_dagrun = '2023-03-08T23:00:00+00:00':: timestamptz,
next_dagrun_data_interval_start = '2023-03-08T23:00:00+00:00':: timestamptz,
next_dagrun_data_interval_end = '2023-03-09T00:00:00+00:00':: timestamptz,
next_dagrun_create_after = '2023-03-09T00:00:00+00:00'::timestamptz WHERE
dag.dag_id = 'future_dagrun_00' # set in step 7
```
From what I've read of the relevant code in the latest v2.6.1, I believe the
problem continues.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]