MatrixManAtYrService opened a new issue, #25586: URL: https://github.com/apache/airflow/issues/25586
### Apache Airflow version main (development) ### What happened First, an apology. I tried to replicate this with several simpler sets of dags, but they worked fine. I don't know what it is about this particular arrangement that causes the problem. Here's a directed graph:  If we interpret each node as a DAG, and each outbound edge as a task in that dag such that each task specifies a unique dataset as its outlet which is referenced by the targeted node/DAG as an inlet, then we end up with this file: ```python3 from airflow import Dataset, DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta from math import ceil import pause def wait_until_twenty_sec(): now = datetime.now() extra = 20 - (ceil(now.second / 10) * 10) go_time = now + timedelta(seconds=extra) print(f"waiting until {go_time}") pause.until(go_time) a5_a14 = Dataset("a5_a14") a5_a12 = Dataset("a5_a12") a15_a14 = Dataset("a15_a14") a7_a6 = Dataset("a7_a6") a7_a8 = Dataset("a7_a8") a6_a2 = Dataset("a6_a2") a5_a10 = Dataset("a5_a10") a11_a1 = Dataset("a11_a1") a1_a12 = Dataset("a1_a12") a1_a2 = Dataset("a1_a2") a13_a5 = Dataset("a13_a5") a15_a9 = Dataset("a15_a9") a10_a4 = Dataset("a10_a4") a10_a11 = Dataset("a10_a11") a4_a15 = Dataset("a4_a15") a5_a6 = Dataset("a5_a6") a2_a7 = Dataset("a2_a7") a11_a8 = Dataset("a11_a8") a13_a9 = Dataset("a13_a9") a1_a3 = Dataset("a1_a3") a4_a14 = Dataset("a4_a14") a6_a15 = Dataset("a6_a15") a11_a2 = Dataset("a11_a2") a14_a8 = Dataset("a14_a8") a15_a12 = Dataset("a15_a12") a7_a14 = Dataset("a7_a14") a6_a4 = Dataset("a6_a4") a14_a12 = Dataset("a14_a12") a9_a5 = Dataset("a9_a5") a14_a9 = Dataset("a14_a9") a6_a1 = Dataset("a6_a1") a15_a1 = Dataset("a15_a1") with DAG(dag_id="a1", start_date=datetime(1970, 1, 1),schedule_on=[a6_a1, a11_a1, a15_a1], ) as a1: PythonOperator(task_id='a1_a2', python_callable=wait_until_twenty_sec, outlets=[a1_a2]) PythonOperator(task_id='a1_a3', python_callable=wait_until_twenty_sec, outlets=[a1_a3]) PythonOperator(task_id='a1_a12', python_callable=wait_until_twenty_sec, outlets=[a1_a12]) with DAG(dag_id="a2", start_date=datetime(1970, 1, 1),schedule_on=[a1_a2, a6_a2, a11_a2], ) as a2: PythonOperator(task_id='a2_a7', python_callable=wait_until_twenty_sec, outlets=[a2_a7]) with DAG(dag_id="a3", start_date=datetime(1970, 1, 1),schedule_on=[a1_a3], ) as a3: PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec) with DAG(dag_id="a6", start_date=datetime(1970, 1, 1),schedule_on=[a5_a6, a7_a6], ) as a6: PythonOperator(task_id='a6_a1', python_callable=wait_until_twenty_sec, outlets=[a6_a1]) PythonOperator(task_id='a6_a2', python_callable=wait_until_twenty_sec, outlets=[a6_a2]) PythonOperator(task_id='a6_a4', python_callable=wait_until_twenty_sec, outlets=[a6_a4]) PythonOperator(task_id='a6_a15', python_callable=wait_until_twenty_sec, outlets=[a6_a15]) with DAG(dag_id="a11", start_date=datetime(1970, 1, 1),schedule_on=[a10_a11], ) as a11: PythonOperator(task_id='a11_a1', python_callable=wait_until_twenty_sec, outlets=[a11_a1]) PythonOperator(task_id='a11_a2', python_callable=wait_until_twenty_sec, outlets=[a11_a2]) PythonOperator(task_id='a11_a8', python_callable=wait_until_twenty_sec, outlets=[a11_a8]) with DAG(dag_id="a12", start_date=datetime(1970, 1, 1),schedule_on=[a1_a12, a5_a12, a14_a12, a15_a12], ) as a12: PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec) with DAG(dag_id="a15", start_date=datetime(1970, 1, 1),schedule_on=[a4_a15, a6_a15], ) as a15: PythonOperator(task_id='a15_a1', python_callable=wait_until_twenty_sec, outlets=[a15_a1]) PythonOperator(task_id='a15_a9', python_callable=wait_until_twenty_sec, outlets=[a15_a9]) PythonOperator(task_id='a15_a12', python_callable=wait_until_twenty_sec, outlets=[a15_a12]) PythonOperator(task_id='a15_a14', python_callable=wait_until_twenty_sec, outlets=[a15_a14]) with DAG(dag_id="a7", start_date=datetime(1970, 1, 1),schedule_on=[a2_a7], ) as a7: PythonOperator(task_id='a7_a6', python_callable=wait_until_twenty_sec, outlets=[a7_a6]) PythonOperator(task_id='a7_a8', python_callable=wait_until_twenty_sec, outlets=[a7_a8]) PythonOperator(task_id='a7_a14', python_callable=wait_until_twenty_sec, outlets=[a7_a14]) with DAG(dag_id="a4", start_date=datetime(1970, 1, 1),schedule_on=[a6_a4, a10_a4], ) as a4: PythonOperator(task_id='a4_a14', python_callable=wait_until_twenty_sec, outlets=[a4_a14]) PythonOperator(task_id='a4_a15', python_callable=wait_until_twenty_sec, outlets=[a4_a15]) with DAG(dag_id="a10", start_date=datetime(1970, 1, 1),schedule_on=[a5_a10], ) as a10: PythonOperator(task_id='a10_a4', python_callable=wait_until_twenty_sec, outlets=[a10_a4]) PythonOperator(task_id='a10_a11', python_callable=wait_until_twenty_sec, outlets=[a10_a11]) with DAG(dag_id="a14", start_date=datetime(1970, 1, 1),schedule_on=[a4_a14, a5_a14, a7_a14, a15_a14], ) as a14: PythonOperator(task_id='a14_a8', python_callable=wait_until_twenty_sec, outlets=[a14_a8]) PythonOperator(task_id='a14_a9', python_callable=wait_until_twenty_sec, outlets=[a14_a9]) PythonOperator(task_id='a14_a12', python_callable=wait_until_twenty_sec, outlets=[a14_a12]) with DAG(dag_id="a5", start_date=datetime(1970, 1, 1),schedule_on=[a9_a5, a13_a5], ) as a5: PythonOperator(task_id='a5_a6', python_callable=wait_until_twenty_sec, outlets=[a5_a6]) PythonOperator(task_id='a5_a10', python_callable=wait_until_twenty_sec, outlets=[a5_a10]) PythonOperator(task_id='a5_a12', python_callable=wait_until_twenty_sec, outlets=[a5_a12]) PythonOperator(task_id='a5_a14', python_callable=wait_until_twenty_sec, outlets=[a5_a14]) with DAG(dag_id="a9", start_date=datetime(1970, 1, 1),schedule_on=[a13_a9, a14_a9, a15_a9], ) as a9: PythonOperator(task_id='a9_a5', python_callable=wait_until_twenty_sec, outlets=[a9_a5]) with DAG(dag_id="start_a13", start_date=datetime(1970, 1, 1),schedule_interval=None, ) as start_a13: PythonOperator(task_id='a13_a5', python_callable=wait_until_twenty_sec, outlets=[a13_a5]) PythonOperator(task_id='a13_a9', python_callable=wait_until_twenty_sec, outlets=[a13_a9]) with DAG(dag_id="a8", start_date=datetime(1970, 1, 1),schedule_on=[a7_a8, a11_a8, a14_a8], ) as a8: PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec) ``` Note that the DAG called `start_a13` has two tasks which outlet to datasets: - `a13_a5` - `a13_a9` Subsequently, DAGs `a5` and `a9` specify those datasets as inlets. So I would expect that if I run `start_a13` and wait around for a bit, `a5` and `a9` should run. But that doesn't happen. Instead, `start_a13` runs and completes successfully, but the other dags don't run. The scheduler logs don't show any errors, they just kind of stop: ``` venv ❯ airflow scheduler ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ {scheduler_job.py:692} INFO - Starting the scheduler {scheduler_job.py:697} INFO - Processing each file at most -1 times [INFO] Starting gunicorn 20.1.0 [INFO] Listening at: http://[::]:8793 (71751) [INFO] Using worker: sync {executor_loader.py:105} INFO - Loaded executor: LocalExecutor [INFO] Booting worker with pid: 71752 [INFO] Booting worker with pid: 71765 {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 71889 {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs {settings.py:55} INFO - Configured default timezone Timezone('UTC') {scheduler_job.py:341} INFO - 2 tasks up for execution: <TaskInstance: start_a13.a13_a5 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]> <TaskInstance: start_a13.a13_a9 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]> {scheduler_job.py:406} INFO - DAG start_a13 has 0/16 running and queued tasks {scheduler_job.py:406} INFO - DAG start_a13 has 1/16 running and queued tasks {scheduler_job.py:492} INFO - Setting the following tasks to queued state: <TaskInstance: start_a13.a13_a5 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]> <TaskInstance: start_a13.a13_a9 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]> {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a13', task_id='a13_a5', run_id='manual__2022-08-08T04:44:13.755620+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'start_a13', 'a13_a5', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py'] {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a13', task_id='a13_a9', run_id='manual__2022-08-08T04:44:13.755620+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'start_a13', 'a13_a9', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py'] {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'start_a13', 'a13_a5', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py'] {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'start_a13', 'a13_a9', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py'] {task_command.py:378} INFO - Running <TaskInstance: start_a13.a13_a5 manual__2022-08-08T04:44:13.755620+00:00 [queued]> on host ChoedanKal {task_command.py:378} INFO - Running <TaskInstance: start_a13.a13_a9 manual__2022-08-08T04:44:13.755620+00:00 [queued]> on host ChoedanKal {scheduler_job.py:583} INFO - Executor reports execution of start_a13.a13_a5 run_id=manual__2022-08-08T04:44:13.755620+00:00 exited with status success for try_number 1 {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a13, task_id=a13_a5, run_id=manual__2022-08-08T04:44:13.755620+00:00, map_index=-1, run_start_date=2022-08-08 04:44:15.281603+00:00, run_end_date=2022-08-08 04:44:16.121190+00:00, run_duration=0.839587, state=success, executor_state=success, try_number=1, max_tries=0, job_id=17, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-08 04:44:14.164815+00:00, queued_by_job_id=15, pid=71985 {dagrun.py:567} INFO - Marking run <DagRun start_a13 @ 2022-08-08 04:44:13.755620+00:00: manual__2022-08-08T04:44:13.755620+00:00, state:running, queued_at: 2022-08-08 04:44:13.817611+00:00. externally triggered: True> successful {dagrun.py:612} INFO - DagRun Finished: dag_id=start_a13, execution_date=2022-08-08 04:44:13.755620+00:00, run_id=manual__2022-08-08T04:44:13.755620+00:00, run_start_date=2022-08-08 04:44:13.960460+00:00, run_end_date=2022-08-08 04:44:17.579620+00:00, run_duration=3.61916, state=success, external_trigger=True, run_type=manual, data_interval_start=2022-08-08 04:44:13.755620+00:00, data_interval_end=2022-08-08 04:44:13.755620+00:00, dag_hash=254f7fa18d109067953eb15cf420949c {dag.py:3178} INFO - Setting next_dagrun for start_a13 to None, run_after=None {scheduler_job.py:583} INFO - Executor reports execution of start_a13.a13_a9 run_id=manual__2022-08-08T04:44:13.755620+00:00 exited with status success for try_number 1 {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a13, task_id=a13_a9, run_id=manual__2022-08-08T04:44:13.755620+00:00, map_index=-1, run_start_date=2022-08-08 04:44:15.470887+00:00, run_end_date=2022-08-08 04:44:16.354373+00:00, run_duration=0.883486, state=success, executor_state=success, try_number=1, max_tries=0, job_id=16, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-08 04:44:14.164815+00:00, queued_by_job_id=15, pid=71988 [INFO] Handling signal: winch ``` ### What you think should happen instead The dags `a5` and `a9` should run. ### How to reproduce Run the dag above ### Operating System NixOS (linux) ### Versions of Apache Airflow Providers n/a ### Deployment Virtualenv installation ### Deployment details I'm using the LocalExecutor (not sure if that's relevant). Also, here's a dump of my database. Based on the dag dependency view, it looks like Airflow is aware of the dependency. And the tasks with the outlets were successful, so I'm not sure why the manually triggered dag was the only one that ran. [dump.sql.gz](https://github.com/apache/airflow/files/9278812/dump.sql.gz) ### Anything else In case you're interested in generating other messy dags, here's a gist with the generator script: https://gist.github.com/MatrixManAtYrService/52f25558499189848f75dc1ece0c42a2 ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
