Quick update, I see a log message on the paused subdag tasks in question: FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 2. State set to NONE.
I dunno if that helps or if there's something I can do about this. Cheers, P On Thu, Aug 17, 2017 at 7:14 PM, Paul Elliot <[email protected]> wrote: > Hi all, > > We've been using Airflow for the past year in our data operations. Been > really happy with it so far! > > Environment: Airflow 1.8.0 with the LocalExecutor running on a single > machine. > > Recently ran into an issue with a DAG containing one subdag operator with > around 20 - 30 simple ETL subdags. Relevant code posted below. I poked > around the mailing list archives but couldn't find anything related. > > When running the main DAG, things seem to work for the majority of the > tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused, > but several of the subdags never run. Their tasks are registered with the > scheduler, and the UI shows the subdag operator task to be 'running'. The > subdag tasks which aren't running have status 'null'. Clicking 'Task > Instance Details' shows me a dependencies problem: > > Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is > paused. > For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the > paused subdags in question. Any reason why this happens, or what I can do > to avoid, or even just, how do I unpause the subdags? > > Thanks, > Paul > > > > # ... import stuff ... > > run_date = "{{ ds }}" > > > def subdag_generator(parent_dag_name, child_dag_name, args): > dag_subdag = DAG( > dag_id='{}.{}'.format(parent_dag_name, child_dag_name), > default_args=args, > schedule_interval=None, > ) > > for tg, task, tables in get_ticket_gateways(): > crawl_event_task = PythonOperator( > task_id='{}-{}_extract_event'.format(child_dag_name, tg), > default_args=args, > provide_context=True, > op_args=['Tickets.extractors', task], > templates_dict={'date': run_date}, > op_kwargs={'mode': 'extract'}, > trigger_rule='all_success', > python_callable=airflow_callable, > dag=dag_subdag) > > process_event_task = PythonOperator( > task_id='{}-{}_process_event'.format(child_dag_name, tg), > default_args=args, > provide_context=True, > op_args=['Tickets.extractors', task], > op_kwargs={'mode': 'process'}, > templates_dict={'date': run_date}, > trigger_rule='all_success', > python_callable=airflow_callable, > dag=dag_subdag) > > post_to_slack_task = SlackAPIPostOperator( > task_id='{}-{}_post_to_slack'.format(child_dag_name, tg), > default_args=args, > token=SLACK_TOKEN, > channel=SLACK_LOG_CHANNEL, > text='{} finished running for execution {}.'.format(tg, > run_date), > dag=dag_subdag) > > crawl_event_task.set_downstream(process_event_task) > process_event_task.set_downstream(post_to_slack_task) > > return dag_subdag > > > dag = DAG( > dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS, > schedule_interval=get_dag_schedule()) # just '@weekly' or '@once' > > subdag_task = SubDagOperator( > task_id='ticket_gateways_subdag', > subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag', > DAG_DEFAULT_ARGS), > dag=dag > ) > > post_to_slack_task = SlackAPIPostOperator( > task_id='ticket_gateways_post_to_slack', > token=SLACK_TOKEN, > channel=SLACK_LOG_CHANNEL, > trigger_rule='all_done', > text='success', > dag=dag > ) > > notify_slack_task = SlackAPIPostOperator( > task_id='ticket_gateways_notify_slack', > token=SLACK_TOKEN, > channel=SLACK_NOTIFICATION_CHANNEL, > trigger_rule='one_failed', > text='failed', > dag=dag > ) > > subdag_task.set_downstream([post_to_slack_task, notify_slack_task]) > > > if __name__ == "__main__": > dag.cli() > > -- > > *Paul Elliott | 엘리엇폴* > Developer / Platform Dev. team > [email protected] > +82-10-2990-8642 > -- *Paul Elliott | 엘리엇폴* Developer / Platform Dev. team [email protected] +82-10-2990-8642
