grayver commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705532853
We have 13 DAGs, each has ~5 tasks (so, ~70 tasks total). Most of those DAGs
run hourly (with some time offset). In most cases DAG just checks for new files
to grab and does nothing if no new files found. When DAG discovers new
unprocessed file, it grabs it, parses it, load parsed data into database and
call process function there. In that case it could take some time (up to few
hours).
I've prepared reproduce code with 2 DAGs: one long-running DAG and second is
short-running DAG. When long-running DAG is running no new short-running DAG
runs are scheduled and started (and UI reports that the scheduler doesn't
appear to be running).
<details><summary>blocking_reproduce_dag.py</summary>
```python
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
long_dag = DAG(
dag_id='long_dag',
schedule_interval='@hourly',
start_date=days_ago(1),
catchup=False
)
check_files_task = BranchPythonOperator(
task_id='check_files', dag=long_dag,
python_callable=lambda: 'parse_files'
)
parse_files_task = BashOperator(
task_id='parse_files', dag=long_dag,
bash_command='sleep 20m'
)
process_files_task = BashOperator(
task_id='process_files', dag=long_dag,
bash_command='sleep 15m'
)
slack_report_task = DummyOperator(
task_id='slack_report', dag=long_dag,
trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED
)
check_files_task >> parse_files_task >> process_files_task >>
slack_report_task
check_files_task >> slack_report_task
short_dag = DAG(
dag_id='short_dag',
schedule_interval='*/5 * * * *',
start_date=days_ago(1),
catchup=False,
max_active_runs=1
)
query_service_task = BashOperator(
task_id='query_service', dag=short_dag,
bash_command='sleep 30s'
)
do_something_task = DummyOperator(
task_id='do_something', dag=short_dag
)
query_service_task >> do_something_task
```
</details>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]