boushphong opened a new issue, #35870:
URL: https://github.com/apache/airflow/issues/35870

   ### Apache Airflow version
   
   2.7.3
   
   ### What happened
   
   When multiple airflow tasks finish at about the same time, and those tasks 
are also responsible for triggering other Dag via Dataset. There will be 
missing dataset triggered dag run. 
   
   **For example:**
   A Dag that has 2 tasks triggering another Dag via Dataset, there must be 2 
dataset triggered dag runs for the triggered dag.
   From my observation, if 2 tasks finishes at about the same time, there will 
be missing triggered dag runs, so there might be only 1 dag run will be 
triggered instead of 2.
   
   ### What you think should happen instead
   
   The number of dataset triggered dag runs has to be added up to the number of 
tasks (that triggers the dataset run) that finishes at the same time.
   
   ### How to reproduce
   
   ```python3
   import datetime
   
   from airflow import DAG
   from airflow.operators.empty import EmptyOperator
   from airflow.operators.bash import BashOperator
   from airflow.datasets import Dataset
   
   with DAG(
           "dataset_triggered_runs",
           start_date=datetime.datetime(2022, 1, 1),
           schedule="0 0 * * *",
           catchup=False,
   ) as dataset_triggered_runs:
       bash_sleep = BashOperator(
           task_id="bash_sleep", bash_command="sleep 3", 
outlets=[Dataset("test")]
       )
   
       bash_sleep_2 = BashOperator(
           task_id="bash_sleep_2", bash_command="sleep 5", 
outlets=[Dataset("test")]
       )
   
   with DAG(
           "missing_dataset_triggered_runs",
           start_date=datetime.datetime(2022, 1, 1),
           schedule="0 0 * * *",
           catchup=False,
   ) as missing_dataset_triggered_runs:
       echo1 = BashOperator(
           task_id="bash_sleep", bash_command="echo 1", 
outlets=[Dataset("test")]
       )
   
       echo2 = BashOperator(
           task_id="bash_sleep_2", bash_command="echo 2", 
outlets=[Dataset("test")]
       )
   
   with DAG(
           "trigger_v1",
           start_date=datetime.datetime(2022, 1, 1),
           schedule=[Dataset("test")],
   ) as dag2:
       dataset_trigger_task = EmptyOperator(task_id="empty_task_2")
   ```
   
   The **dataset_triggered_runs** DAG have 2 tasks (that triggers dataset run) 
finishing at different time, and there are 2 dataset triggered dag runs, which 
is expected.
   ```python3
   [2023-11-27T03:19:22.897+0700] {dagrun.py:653} INFO - Marking run <DagRun 
trigger_v1 @ 2023-11-26 20:19:21.217750+00:00: 
dataset_triggered__2023-11-26T20:19:21.217750+00:00, state:running, queued_at: 
2023-11-26 20:19:21.809036+00:00. externally triggered: False> successful
   [2023-11-27T03:19:22.897+0700] {dagrun.py:704} INFO - DagRun Finished: 
dag_id=trigger_v1, execution_date=2023-11-26 20:19:21.217750+00:00, 
run_id=dataset_triggered__2023-11-26T20:19:21.217750+00:00, 
run_start_date=2023-11-26 20:19:21.851541+00:00, run_end_date=2023-11-26 
20:19:22.897392+00:00, run_duration=1.045851, state=success, 
external_trigger=False, run_type=dataset_triggered, 
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 
00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
   [2023-11-27T03:19:23.490+0700] {dagrun.py:653} INFO - Marking run <DagRun 
dataset_triggered_runs @ 2023-11-26 20:19:17.627907+00:00: 
manual__2023-11-26T20:19:17.627907+00:00, state:running, queued_at: 2023-11-26 
20:19:17.637778+00:00. externally triggered: True> successful
   [2023-11-27T03:19:23.490+0700] {dagrun.py:704} INFO - DagRun Finished: 
dag_id=dataset_triggered_runs, execution_date=2023-11-26 20:19:17.627907+00:00, 
run_id=manual__2023-11-26T20:19:17.627907+00:00, run_start_date=2023-11-26 
20:19:17.888672+00:00, run_end_date=2023-11-26 20:19:23.490855+00:00, 
run_duration=5.602183, state=success, external_trigger=True, run_type=manual, 
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 
00:00:00+00:00, dag_hash=d60293c69d62b7a5aeb3cf3d884b9693
   [2023-11-27T03:19:23.500+0700] {scheduler_job_runner.py:685} INFO - Received 
executor event with state success for task instance 
TaskInstanceKey(dag_id='dataset_triggered_runs', task_id='bash_sleep_2', 
run_id='manual__2023-11-26T20:19:17.627907+00:00', try_number=1, map_index=-1)
   [2023-11-27T03:19:23.503+0700] {scheduler_job_runner.py:722} INFO - 
TaskInstance Finished: dag_id=dataset_triggered_runs, task_id=bash_sleep_2, 
run_id=manual__2023-11-26T20:19:17.627907+00:00, map_index=-1, 
run_start_date=2023-11-26 20:19:18.027568+00:00, run_end_date=2023-11-26 
20:19:23.206466+00:00, run_duration=5.178898, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=189, 
pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, 
queued_dttm=2023-11-26 20:19:17.914731+00:00, queued_by_job_id=1, pid=23848
   [2023-11-27T03:19:23.678+0700] {dagrun.py:653} INFO - Marking run <DagRun 
trigger_v1 @ 2023-11-26 20:19:23.214909+00:00: 
dataset_triggered__2023-11-26T20:19:23.214909+00:00, state:running, queued_at: 
2023-11-26 20:19:23.471484+00:00. externally triggered: False> successful
   [2023-11-27T03:19:23.678+0700] {dagrun.py:704} INFO - DagRun Finished: 
dag_id=trigger_v1, execution_date=2023-11-26 20:19:23.214909+00:00, 
run_id=dataset_triggered__2023-11-26T20:19:23.214909+00:00, 
run_start_date=2023-11-26 20:19:23.479981+00:00, run_end_date=2023-11-26 
20:19:23.678956+00:00, run_duration=0.198975, state=success, 
external_trigger=False, run_type=dataset_triggered, 
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 
00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
   ```
   
   However, the **missing_dataset_triggered_runs** DAG have 2 tasks (that 
triggers dataset run) finishing at about the same time, and there is only 1 
dataset triggered dag run, which is unexpected. This is very likely a bug.
   ```python3
   [2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO - 
TaskInstance Finished: dag_id=missing_dataset_triggered_runs, 
task_id=bash_sleep_2, run_id=scheduled__2023-11-25T00:00:00+00:00, 
map_index=-1, run_start_date=2023-11-26 20:21:41.184677+00:00, 
run_end_date=2023-11-26 20:21:41.356996+00:00, run_duration=0.172319, 
state=success, executor_state=success, try_number=1, max_tries=0, job_id=190, 
pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, 
queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24351
   [2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO - 
TaskInstance Finished: dag_id=missing_dataset_triggered_runs, 
task_id=bash_sleep, run_id=scheduled__2023-11-25T00:00:00+00:00, map_index=-1, 
run_start_date=2023-11-26 20:21:41.184319+00:00, run_end_date=2023-11-26 
20:21:41.345462+00:00, run_duration=0.161143, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=191, 
pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, 
queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24350
   [2023-11-27T03:21:43.175+0700] {dagrun.py:653} INFO - Marking run <DagRun 
trigger_v1 @ 2023-11-26 20:21:41.356742+00:00: 
dataset_triggered__2023-11-26T20:21:41.356742+00:00, state:running, queued_at: 
2023-11-26 20:21:42.118410+00:00. externally triggered: False> successful
   [2023-11-27T03:21:43.175+0700] {dagrun.py:704} INFO - DagRun Finished: 
dag_id=trigger_v1, execution_date=2023-11-26 20:21:41.356742+00:00, 
run_id=dataset_triggered__2023-11-26T20:21:41.356742+00:00, 
run_start_date=2023-11-26 20:21:42.126641+00:00, run_end_date=2023-11-26 
20:21:43.175274+00:00, run_duration=1.048633, state=success, 
external_trigger=False, run_type=dataset_triggered, 
data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 
00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
   ```
   
   ### Operating System
   
   Docker
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to