grayver commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705532853


   We have 13 DAGs, each has ~5 tasks (so, ~70 tasks total). Most of those DAGs 
run hourly (with some time offset). In most cases DAG just checks for new files 
to grab and does nothing if no new files found. When DAG discovers new 
unprocessed file, it grabs it, parses it, load parsed data into database and 
call process function there. In that case it could take some time (up to few 
hours).
   
   I've prepared reproduce code with 2 DAGs: one long-running DAG and second is 
short-running DAG. When long-running DAG is running no new short-running DAG 
runs are scheduled and started (and UI reports that the scheduler doesn't 
appear to be running).
   
   <details><summary>blocking_reproduce_dag.py</summary>
   
   ```python
   from airflow.models import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.utils.dates import days_ago
   from airflow.utils.trigger_rule import TriggerRule
   
   long_dag = DAG(
       dag_id='long_dag',
       schedule_interval='@hourly',
       start_date=days_ago(1),
       catchup=False
   )
   
   check_files_task = BranchPythonOperator(
       task_id='check_files', dag=long_dag,
       python_callable=lambda: 'parse_files'
   )
   
   parse_files_task = BashOperator(
       task_id='parse_files', dag=long_dag,
       bash_command='sleep 20m'
   )
   
   process_files_task = BashOperator(
       task_id='process_files', dag=long_dag,
       bash_command='sleep 15m'
   )
   
   slack_report_task = DummyOperator(
       task_id='slack_report', dag=long_dag,
       trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED
   )
   
   check_files_task >> parse_files_task >> process_files_task >> 
slack_report_task
   check_files_task >> slack_report_task
   
   
   short_dag = DAG(
       dag_id='short_dag',
       schedule_interval='*/5 * * * *',
       start_date=days_ago(1),
       catchup=False,
       max_active_runs=1
   )
   
   query_service_task = BashOperator(
       task_id='query_service', dag=short_dag,
       bash_command='sleep 30s'
   )
   
   do_something_task = DummyOperator(
       task_id='do_something', dag=short_dag
   )
   
   query_service_task >> do_something_task
   ```
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to