boushphong opened a new issue, #35870:
URL: https://github.com/apache/airflow/issues/35870
### Apache Airflow version
2.7.3
### What happened
When multiple airflow tasks finish at about the same time, and those tasks
are also responsible for triggering other Dag via Dataset. There will be
missing dataset triggered dag run.
**For example:**
A Dag that has 2 tasks triggering another Dag via Dataset, there must be 2
dataset triggered dag runs for the triggered dag.
From my observation, if 2 tasks finishes at about the same time, there will
be missing triggered dag runs, so there might be only 1 dag run will be
triggered instead of 2.
### What you think should happen instead
The number of dataset triggered dag runs has to be added up to the number of
tasks (that triggers the dataset run) that finishes at the same time.
### How to reproduce
```python3
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.datasets import Dataset
with DAG(
"dataset_triggered_runs",
start_date=datetime.datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as dataset_triggered_runs:
bash_sleep = BashOperator(
task_id="bash_sleep", bash_command="sleep 3",
outlets=[Dataset("test")]
)
bash_sleep_2 = BashOperator(
task_id="bash_sleep_2", bash_command="sleep 5",
outlets=[Dataset("test")]
)
with DAG(
"missing_dataset_triggered_runs",
start_date=datetime.datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as missing_dataset_triggered_runs:
echo1 = BashOperator(
task_id="bash_sleep", bash_command="echo 1",
outlets=[Dataset("test")]
)
echo2 = BashOperator(
task_id="bash_sleep_2", bash_command="echo 2",
outlets=[Dataset("test")]
)
with DAG(
"trigger_v1",
start_date=datetime.datetime(2022, 1, 1),
schedule=[Dataset("test")],
) as dag2:
dataset_trigger_task = EmptyOperator(task_id="empty_task_2")
```
The **dataset_triggered_runs** DAG have 2 tasks (that triggers dataset run)
finishing at different time, and there are 2 dataset triggered dag runs, which
is expected.
```python3
[2023-11-27T03:19:22.897+0700] {dagrun.py:653} INFO - Marking run <DagRun
trigger_v1 @ 2023-11-26 20:19:21.217750+00:00:
dataset_triggered__2023-11-26T20:19:21.217750+00:00, state:running, queued_at:
2023-11-26 20:19:21.809036+00:00. externally triggered: False> successful
[2023-11-27T03:19:22.897+0700] {dagrun.py:704} INFO - DagRun Finished:
dag_id=trigger_v1, execution_date=2023-11-26 20:19:21.217750+00:00,
run_id=dataset_triggered__2023-11-26T20:19:21.217750+00:00,
run_start_date=2023-11-26 20:19:21.851541+00:00, run_end_date=2023-11-26
20:19:22.897392+00:00, run_duration=1.045851, state=success,
external_trigger=False, run_type=dataset_triggered,
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26
00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
[2023-11-27T03:19:23.490+0700] {dagrun.py:653} INFO - Marking run <DagRun
dataset_triggered_runs @ 2023-11-26 20:19:17.627907+00:00:
manual__2023-11-26T20:19:17.627907+00:00, state:running, queued_at: 2023-11-26
20:19:17.637778+00:00. externally triggered: True> successful
[2023-11-27T03:19:23.490+0700] {dagrun.py:704} INFO - DagRun Finished:
dag_id=dataset_triggered_runs, execution_date=2023-11-26 20:19:17.627907+00:00,
run_id=manual__2023-11-26T20:19:17.627907+00:00, run_start_date=2023-11-26
20:19:17.888672+00:00, run_end_date=2023-11-26 20:19:23.490855+00:00,
run_duration=5.602183, state=success, external_trigger=True, run_type=manual,
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26
00:00:00+00:00, dag_hash=d60293c69d62b7a5aeb3cf3d884b9693
[2023-11-27T03:19:23.500+0700] {scheduler_job_runner.py:685} INFO - Received
executor event with state success for task instance
TaskInstanceKey(dag_id='dataset_triggered_runs', task_id='bash_sleep_2',
run_id='manual__2023-11-26T20:19:17.627907+00:00', try_number=1, map_index=-1)
[2023-11-27T03:19:23.503+0700] {scheduler_job_runner.py:722} INFO -
TaskInstance Finished: dag_id=dataset_triggered_runs, task_id=bash_sleep_2,
run_id=manual__2023-11-26T20:19:17.627907+00:00, map_index=-1,
run_start_date=2023-11-26 20:19:18.027568+00:00, run_end_date=2023-11-26
20:19:23.206466+00:00, run_duration=5.178898, state=success,
executor_state=success, try_number=1, max_tries=0, job_id=189,
pool=default_pool, queue=default, priority_weight=1, operator=BashOperator,
queued_dttm=2023-11-26 20:19:17.914731+00:00, queued_by_job_id=1, pid=23848
[2023-11-27T03:19:23.678+0700] {dagrun.py:653} INFO - Marking run <DagRun
trigger_v1 @ 2023-11-26 20:19:23.214909+00:00:
dataset_triggered__2023-11-26T20:19:23.214909+00:00, state:running, queued_at:
2023-11-26 20:19:23.471484+00:00. externally triggered: False> successful
[2023-11-27T03:19:23.678+0700] {dagrun.py:704} INFO - DagRun Finished:
dag_id=trigger_v1, execution_date=2023-11-26 20:19:23.214909+00:00,
run_id=dataset_triggered__2023-11-26T20:19:23.214909+00:00,
run_start_date=2023-11-26 20:19:23.479981+00:00, run_end_date=2023-11-26
20:19:23.678956+00:00, run_duration=0.198975, state=success,
external_trigger=False, run_type=dataset_triggered,
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26
00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
```
However, the **missing_dataset_triggered_runs** DAG have 2 tasks (that
triggers dataset run) finishing at about the same time, and there is only 1
dataset triggered dag run, which is unexpected. This is very likely a bug.
```python3
[2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO -
TaskInstance Finished: dag_id=missing_dataset_triggered_runs,
task_id=bash_sleep_2, run_id=scheduled__2023-11-25T00:00:00+00:00,
map_index=-1, run_start_date=2023-11-26 20:21:41.184677+00:00,
run_end_date=2023-11-26 20:21:41.356996+00:00, run_duration=0.172319,
state=success, executor_state=success, try_number=1, max_tries=0, job_id=190,
pool=default_pool, queue=default, priority_weight=1, operator=BashOperator,
queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24351
[2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO -
TaskInstance Finished: dag_id=missing_dataset_triggered_runs,
task_id=bash_sleep, run_id=scheduled__2023-11-25T00:00:00+00:00, map_index=-1,
run_start_date=2023-11-26 20:21:41.184319+00:00, run_end_date=2023-11-26
20:21:41.345462+00:00, run_duration=0.161143, state=success,
executor_state=success, try_number=1, max_tries=0, job_id=191,
pool=default_pool, queue=default, priority_weight=1, operator=BashOperator,
queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24350
[2023-11-27T03:21:43.175+0700] {dagrun.py:653} INFO - Marking run <DagRun
trigger_v1 @ 2023-11-26 20:21:41.356742+00:00:
dataset_triggered__2023-11-26T20:21:41.356742+00:00, state:running, queued_at:
2023-11-26 20:21:42.118410+00:00. externally triggered: False> successful
[2023-11-27T03:21:43.175+0700] {dagrun.py:704} INFO - DagRun Finished:
dag_id=trigger_v1, execution_date=2023-11-26 20:21:41.356742+00:00,
run_id=dataset_triggered__2023-11-26T20:21:41.356742+00:00,
run_start_date=2023-11-26 20:21:42.126641+00:00, run_end_date=2023-11-26
20:21:43.175274+00:00, run_duration=1.048633, state=success,
external_trigger=False, run_type=dataset_triggered,
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26
00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
```
### Operating System
Docker
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]