MatrixManAtYrService opened a new issue, #25586:
URL: https://github.com/apache/airflow/issues/25586

   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   First, an apology.  I tried to replicate this with several simpler sets of 
dags, but they worked fine.  I don't know what it is about this particular 
arrangement that causes the problem.
   
   Here's a directed graph: 
   
![image](https://user-images.githubusercontent.com/5834582/183339466-67668ded-0628-4e4a-8958-d48975c6b764.png)
   
   If we interpret each node as a DAG, and each outbound edge as a task in that 
dag such that each task specifies a unique dataset as its outlet which is 
referenced by the targeted node/DAG as an inlet, then we end up with this file:
   
   ```python3
   from airflow import Dataset, DAG
   from airflow.operators.python import PythonOperator
   from datetime import datetime, timedelta
   from math import ceil
   import pause
   
   def wait_until_twenty_sec():
       now = datetime.now()
       extra = 20 - (ceil(now.second / 10) * 10)
       go_time = now + timedelta(seconds=extra)
       print(f"waiting until {go_time}")
       pause.until(go_time)
   
   a5_a14 = Dataset("a5_a14")
   a5_a12 = Dataset("a5_a12")
   a15_a14 = Dataset("a15_a14")
   a7_a6 = Dataset("a7_a6")
   a7_a8 = Dataset("a7_a8")
   a6_a2 = Dataset("a6_a2")
   a5_a10 = Dataset("a5_a10")
   a11_a1 = Dataset("a11_a1")
   a1_a12 = Dataset("a1_a12")
   a1_a2 = Dataset("a1_a2")
   a13_a5 = Dataset("a13_a5")
   a15_a9 = Dataset("a15_a9")
   a10_a4 = Dataset("a10_a4")
   a10_a11 = Dataset("a10_a11")
   a4_a15 = Dataset("a4_a15")
   a5_a6 = Dataset("a5_a6")
   a2_a7 = Dataset("a2_a7")
   a11_a8 = Dataset("a11_a8")
   a13_a9 = Dataset("a13_a9")
   a1_a3 = Dataset("a1_a3")
   a4_a14 = Dataset("a4_a14")
   a6_a15 = Dataset("a6_a15")
   a11_a2 = Dataset("a11_a2")
   a14_a8 = Dataset("a14_a8")
   a15_a12 = Dataset("a15_a12")
   a7_a14 = Dataset("a7_a14")
   a6_a4 = Dataset("a6_a4")
   a14_a12 = Dataset("a14_a12")
   a9_a5 = Dataset("a9_a5")
   a14_a9 = Dataset("a14_a9")
   a6_a1 = Dataset("a6_a1")
   a15_a1 = Dataset("a15_a1")
   
   with DAG(dag_id="a1",
            start_date=datetime(1970, 1, 1),schedule_on=[a6_a1, a11_a1, a15_a1],
   ) as a1:
   
       PythonOperator(task_id='a1_a2', python_callable=wait_until_twenty_sec, 
outlets=[a1_a2])
       PythonOperator(task_id='a1_a3', python_callable=wait_until_twenty_sec, 
outlets=[a1_a3])
       PythonOperator(task_id='a1_a12', python_callable=wait_until_twenty_sec, 
outlets=[a1_a12])
   
   with DAG(dag_id="a2",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a2, a6_a2, a11_a2],
   ) as a2:
   
       PythonOperator(task_id='a2_a7', python_callable=wait_until_twenty_sec, 
outlets=[a2_a7])
   
   with DAG(dag_id="a3",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a3],
   ) as a3:
   
       PythonOperator(task_id='no_outlets', 
python_callable=wait_until_twenty_sec)
   
   with DAG(dag_id="a6",
            start_date=datetime(1970, 1, 1),schedule_on=[a5_a6, a7_a6],
   ) as a6:
   
       PythonOperator(task_id='a6_a1', python_callable=wait_until_twenty_sec, 
outlets=[a6_a1])
       PythonOperator(task_id='a6_a2', python_callable=wait_until_twenty_sec, 
outlets=[a6_a2])
       PythonOperator(task_id='a6_a4', python_callable=wait_until_twenty_sec, 
outlets=[a6_a4])
       PythonOperator(task_id='a6_a15', python_callable=wait_until_twenty_sec, 
outlets=[a6_a15])
   
   with DAG(dag_id="a11",
            start_date=datetime(1970, 1, 1),schedule_on=[a10_a11],
   ) as a11:
   
       PythonOperator(task_id='a11_a1', python_callable=wait_until_twenty_sec, 
outlets=[a11_a1])
       PythonOperator(task_id='a11_a2', python_callable=wait_until_twenty_sec, 
outlets=[a11_a2])
       PythonOperator(task_id='a11_a8', python_callable=wait_until_twenty_sec, 
outlets=[a11_a8])
   
   with DAG(dag_id="a12",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a12, a5_a12, 
a14_a12, a15_a12],
   ) as a12:
   
       PythonOperator(task_id='no_outlets', 
python_callable=wait_until_twenty_sec)
   
   with DAG(dag_id="a15",
            start_date=datetime(1970, 1, 1),schedule_on=[a4_a15, a6_a15],
   ) as a15:
   
       PythonOperator(task_id='a15_a1', python_callable=wait_until_twenty_sec, 
outlets=[a15_a1])
       PythonOperator(task_id='a15_a9', python_callable=wait_until_twenty_sec, 
outlets=[a15_a9])
       PythonOperator(task_id='a15_a12', python_callable=wait_until_twenty_sec, 
outlets=[a15_a12])
       PythonOperator(task_id='a15_a14', python_callable=wait_until_twenty_sec, 
outlets=[a15_a14])
   
   with DAG(dag_id="a7",
            start_date=datetime(1970, 1, 1),schedule_on=[a2_a7],
   ) as a7:
   
       PythonOperator(task_id='a7_a6', python_callable=wait_until_twenty_sec, 
outlets=[a7_a6])
       PythonOperator(task_id='a7_a8', python_callable=wait_until_twenty_sec, 
outlets=[a7_a8])
       PythonOperator(task_id='a7_a14', python_callable=wait_until_twenty_sec, 
outlets=[a7_a14])
   
   with DAG(dag_id="a4",
            start_date=datetime(1970, 1, 1),schedule_on=[a6_a4, a10_a4],
   ) as a4:
   
       PythonOperator(task_id='a4_a14', python_callable=wait_until_twenty_sec, 
outlets=[a4_a14])
       PythonOperator(task_id='a4_a15', python_callable=wait_until_twenty_sec, 
outlets=[a4_a15])
   
   with DAG(dag_id="a10",
            start_date=datetime(1970, 1, 1),schedule_on=[a5_a10],
   ) as a10:
   
       PythonOperator(task_id='a10_a4', python_callable=wait_until_twenty_sec, 
outlets=[a10_a4])
       PythonOperator(task_id='a10_a11', python_callable=wait_until_twenty_sec, 
outlets=[a10_a11])
   
   with DAG(dag_id="a14",
            start_date=datetime(1970, 1, 1),schedule_on=[a4_a14, a5_a14, 
a7_a14, a15_a14],
   ) as a14:
   
       PythonOperator(task_id='a14_a8', python_callable=wait_until_twenty_sec, 
outlets=[a14_a8])
       PythonOperator(task_id='a14_a9', python_callable=wait_until_twenty_sec, 
outlets=[a14_a9])
       PythonOperator(task_id='a14_a12', python_callable=wait_until_twenty_sec, 
outlets=[a14_a12])
   
   with DAG(dag_id="a5",
            start_date=datetime(1970, 1, 1),schedule_on=[a9_a5, a13_a5],
   ) as a5:
   
       PythonOperator(task_id='a5_a6', python_callable=wait_until_twenty_sec, 
outlets=[a5_a6])
       PythonOperator(task_id='a5_a10', python_callable=wait_until_twenty_sec, 
outlets=[a5_a10])
       PythonOperator(task_id='a5_a12', python_callable=wait_until_twenty_sec, 
outlets=[a5_a12])
       PythonOperator(task_id='a5_a14', python_callable=wait_until_twenty_sec, 
outlets=[a5_a14])
   
   with DAG(dag_id="a9",
            start_date=datetime(1970, 1, 1),schedule_on=[a13_a9, a14_a9, 
a15_a9],
   ) as a9:
   
       PythonOperator(task_id='a9_a5', python_callable=wait_until_twenty_sec, 
outlets=[a9_a5])
   
   with DAG(dag_id="start_a13",
            start_date=datetime(1970, 1, 1),schedule_interval=None,
   ) as start_a13:
   
       PythonOperator(task_id='a13_a5', python_callable=wait_until_twenty_sec, 
outlets=[a13_a5])
       PythonOperator(task_id='a13_a9', python_callable=wait_until_twenty_sec, 
outlets=[a13_a9])
   
   with DAG(dag_id="a8",
            start_date=datetime(1970, 1, 1),schedule_on=[a7_a8, a11_a8, a14_a8],
   ) as a8:
   
       PythonOperator(task_id='no_outlets', 
python_callable=wait_until_twenty_sec)
   
   ```
   
   Note that the DAG called `start_a13` has two tasks which outlet to datasets:
   - `a13_a5`
   - `a13_a9`
   
   Subsequently, DAGs `a5` and `a9` specify those datasets as inlets.  So I 
would expect that if I run `start_a13` and wait around for a bit, `a5` and `a9` 
should run.  But that doesn't happen.  Instead, `start_a13` runs and completes 
successfully, but the other dags don't run.
   
   
   The scheduler logs don't show any errors, they just kind of stop: 
   ```
   
   
   venv ❯ airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   {scheduler_job.py:692} INFO - Starting the scheduler
   {scheduler_job.py:697} INFO - Processing each file at most -1 times
   [INFO] Starting gunicorn 20.1.0
   [INFO] Listening at: http://[::]:8793 (71751)
   [INFO] Using worker: sync
   {executor_loader.py:105} INFO - Loaded executor: LocalExecutor
   [INFO] Booting worker with pid: 71752
   [INFO] Booting worker with pid: 71765
   {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 71889
   {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
   {settings.py:55} INFO - Configured default timezone Timezone('UTC')
   {scheduler_job.py:341} INFO - 2 tasks up for execution:
        <TaskInstance: start_a13.a13_a5 
manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
        <TaskInstance: start_a13.a13_a9 
manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
   {scheduler_job.py:406} INFO - DAG start_a13 has 0/16 running and queued tasks
   {scheduler_job.py:406} INFO - DAG start_a13 has 1/16 running and queued tasks
   {scheduler_job.py:492} INFO - Setting the following tasks to queued state:
        <TaskInstance: start_a13.a13_a5 
manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
        <TaskInstance: start_a13.a13_a9 
manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a13', 
task_id='a13_a5', run_id='manual__2022-08-08T04:44:13.755620+00:00', 
try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 
'start_a13', 'a13_a5', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a13', 
task_id='a13_a9', run_id='manual__2022-08-08T04:44:13.755620+00:00', 
try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 
'start_a13', 'a13_a9', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', 
'--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 
'run', 'start_a13', 'a13_a5', 'manual__2022-08-08T04:44:13.755620+00:00', 
'--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 
'run', 'start_a13', 'a13_a9', 'manual__2022-08-08T04:44:13.755620+00:00', 
'--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {task_command.py:378} INFO - Running <TaskInstance: start_a13.a13_a5 
manual__2022-08-08T04:44:13.755620+00:00 [queued]> on host ChoedanKal
   {task_command.py:378} INFO - Running <TaskInstance: start_a13.a13_a9 
manual__2022-08-08T04:44:13.755620+00:00 [queued]> on host ChoedanKal
   {scheduler_job.py:583} INFO - Executor reports execution of start_a13.a13_a5 
run_id=manual__2022-08-08T04:44:13.755620+00:00 exited with status success for 
try_number 1
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a13, 
task_id=a13_a5, run_id=manual__2022-08-08T04:44:13.755620+00:00, map_index=-1, 
run_start_date=2022-08-08 04:44:15.281603+00:00, run_end_date=2022-08-08 
04:44:16.121190+00:00, run_duration=0.839587, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=17, 
pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, 
queued_dttm=2022-08-08 04:44:14.164815+00:00, queued_by_job_id=15, pid=71985
   {dagrun.py:567} INFO - Marking run <DagRun start_a13 @ 2022-08-08 
04:44:13.755620+00:00: manual__2022-08-08T04:44:13.755620+00:00, state:running, 
queued_at: 2022-08-08 04:44:13.817611+00:00. externally triggered: True> 
successful
   {dagrun.py:612} INFO - DagRun Finished: dag_id=start_a13, 
execution_date=2022-08-08 04:44:13.755620+00:00, 
run_id=manual__2022-08-08T04:44:13.755620+00:00, run_start_date=2022-08-08 
04:44:13.960460+00:00, run_end_date=2022-08-08 04:44:17.579620+00:00, 
run_duration=3.61916, state=success, external_trigger=True, run_type=manual, 
data_interval_start=2022-08-08 04:44:13.755620+00:00, 
data_interval_end=2022-08-08 04:44:13.755620+00:00, 
dag_hash=254f7fa18d109067953eb15cf420949c
   {dag.py:3178} INFO - Setting next_dagrun for start_a13 to None, 
run_after=None
   {scheduler_job.py:583} INFO - Executor reports execution of start_a13.a13_a9 
run_id=manual__2022-08-08T04:44:13.755620+00:00 exited with status success for 
try_number 1
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a13, 
task_id=a13_a9, run_id=manual__2022-08-08T04:44:13.755620+00:00, map_index=-1, 
run_start_date=2022-08-08 04:44:15.470887+00:00, run_end_date=2022-08-08 
04:44:16.354373+00:00, run_duration=0.883486, state=success, 
executor_state=success, try_number=1, max_tries=0, job_id=16, 
pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, 
queued_dttm=2022-08-08 04:44:14.164815+00:00, queued_by_job_id=15, pid=71988
   [INFO] Handling signal: winch
   
   ```
   
   ### What you think should happen instead
   
   The dags `a5` and `a9` should run.
   
   ### How to reproduce
   
   Run the dag above
   
   ### Operating System
   
   NixOS (linux)
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   I'm using the LocalExecutor (not sure if that's relevant).
   
   Also, here's a dump of my database.  Based on the dag dependency view, it 
looks like Airflow is aware of the dependency.  And the tasks with the outlets 
were successful, so I'm not sure why the manually triggered dag was the only 
one that ran.
   
   [dump.sql.gz](https://github.com/apache/airflow/files/9278812/dump.sql.gz)
   
   
   ### Anything else
   
   In case you're interested in generating other messy dags, here's a gist with 
the generator script: 
https://gist.github.com/MatrixManAtYrService/52f25558499189848f75dc1ece0c42a2
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to