[OP found a solution]
Alright so I did some more digging and found the references to subdag
issues in 1.8.0, particularly in a thread "1.8.0 Backfill Clarification"
which helped quite a bit.
We ended up switching to a vanilla main dag with a fat branch : (start
task) < { a fat list of parallel branch pipelines with 3 or so tasks each }
> (end task) - (..other stuff..)
[In an attempt to be useful, ]I did notice a couple things when hacking at
the subdag solution.
1. From the airflow subdag docs
<https://airflow.incubator.apache.org/concepts.html#subdags> state:
> SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is
> set to None or @once, the SubDAG will succeed without having done anything
that doesn't seem to be true, in my experience. Both the working example
using subdags and my own implementation give the subdag a schedule of None,
and will run the subdag tasks (and certainly not succeed).
2. Probably known and likely that deadlock issue mentioned in "1.8.0
Backfill Clarification" thread but, if the main dag's subdag operator has
retries=<0|less than ANY retries for subdag tasks>, the 'backfill' gets
deadlocked. Setting all subdag tasks and my subdag operator to retries=0
solved that, but not my other problems.
Regardless thanks to whoever/whatever snippet of code allowed me on the
mailing list ;)
P
On Mon, Aug 21, 2017 at 12:34 PM, Paul Elliot <[email protected]> wrote:
> Quick update, I see a log message on the paused subdag tasks in question:
>
> FIXME: Rescheduling due to concurrency limits reached at task runtime.
> Attempt 1 of 2. State set to NONE.
>
> I dunno if that helps or if there's something I can do about this.
>
> Cheers,
> P
>
> On Thu, Aug 17, 2017 at 7:14 PM, Paul Elliot <[email protected]>
> wrote:
>
>> Hi all,
>>
>> We've been using Airflow for the past year in our data operations. Been
>> really happy with it so far!
>>
>> Environment: Airflow 1.8.0 with the LocalExecutor running on a single
>> machine.
>>
>> Recently ran into an issue with a DAG containing one subdag operator with
>> around 20 - 30 simple ETL subdags. Relevant code posted below. I poked
>> around the mailing list archives but couldn't find anything related.
>>
>> When running the main DAG, things seem to work for the majority of the
>> tasks. Then the DAG freezes: scheduler looks fine, main DAG is not paused,
>> but several of the subdags never run. Their tasks are registered with the
>> scheduler, and the UI shows the subdag operator task to be 'running'. The
>> subdag tasks which aren't running have status 'null'. Clicking 'Task
>> Instance Details' shows me a dependencies problem:
>>
>> Dag Not Paused Task's DAG 'ticket_gateways.ticket_gateways_subdag' is
>> paused.
>> For clarity, "ticket_gateways.ticket_gateways_subdag" is one of the
>> paused subdags in question. Any reason why this happens, or what I can do
>> to avoid, or even just, how do I unpause the subdags?
>>
>> Thanks,
>> Paul
>>
>>
>>
>> # ... import stuff ...
>>
>> run_date = "{{ ds }}"
>>
>>
>> def subdag_generator(parent_dag_name, child_dag_name, args):
>> dag_subdag = DAG(
>> dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
>> default_args=args,
>> schedule_interval=None,
>> )
>>
>> for tg, task, tables in get_ticket_gateways():
>> crawl_event_task = PythonOperator(
>> task_id='{}-{}_extract_event'.format(child_dag_name, tg),
>> default_args=args,
>> provide_context=True,
>> op_args=['Tickets.extractors', task],
>> templates_dict={'date': run_date},
>> op_kwargs={'mode': 'extract'},
>> trigger_rule='all_success',
>> python_callable=airflow_callable,
>> dag=dag_subdag)
>>
>> process_event_task = PythonOperator(
>> task_id='{}-{}_process_event'.format(child_dag_name, tg),
>> default_args=args,
>> provide_context=True,
>> op_args=['Tickets.extractors', task],
>> op_kwargs={'mode': 'process'},
>> templates_dict={'date': run_date},
>> trigger_rule='all_success',
>> python_callable=airflow_callable,
>> dag=dag_subdag)
>>
>> post_to_slack_task = SlackAPIPostOperator(
>> task_id='{}-{}_post_to_slack'.format(child_dag_name, tg),
>> default_args=args,
>> token=SLACK_TOKEN,
>> channel=SLACK_LOG_CHANNEL,
>> text='{} finished running for execution {}.'.format(tg,
>> run_date),
>> dag=dag_subdag)
>>
>> crawl_event_task.set_downstream(process_event_task)
>> process_event_task.set_downstream(post_to_slack_task)
>>
>> return dag_subdag
>>
>>
>> dag = DAG(
>> dag_id='ticket_gateways', default_args=DAG_DEFAULT_ARGS,
>> schedule_interval=get_dag_schedule()) # just '@weekly' or '@once'
>>
>> subdag_task = SubDagOperator(
>> task_id='ticket_gateways_subdag',
>> subdag=subdag_generator('ticket_gateways', 'ticket_gateways_subdag',
>> DAG_DEFAULT_ARGS),
>> dag=dag
>> )
>>
>> post_to_slack_task = SlackAPIPostOperator(
>> task_id='ticket_gateways_post_to_slack',
>> token=SLACK_TOKEN,
>> channel=SLACK_LOG_CHANNEL,
>> trigger_rule='all_done',
>> text='success',
>> dag=dag
>> )
>>
>> notify_slack_task = SlackAPIPostOperator(
>> task_id='ticket_gateways_notify_slack',
>> token=SLACK_TOKEN,
>> channel=SLACK_NOTIFICATION_CHANNEL,
>> trigger_rule='one_failed',
>> text='failed',
>> dag=dag
>> )
>>
>> subdag_task.set_downstream([post_to_slack_task, notify_slack_task])
>>
>>
>> if __name__ == "__main__":
>> dag.cli()
>>
>> --
>>
>> *Paul Elliott | 엘리엇폴*
>> Developer / Platform Dev. team
>> [email protected]
>> +82-10-2990-8642
>>
>
>
>
> --
>
> *Paul Elliott | 엘리엇폴*
> Developer / Platform Dev. team
> [email protected]
> +82-10-2990-8642
>
--
*Paul Elliott | 엘리엇폴*
Developer / Platform Dev. team
[email protected]
+82-10-2990-8642