No, you can't. (at least out of the box)
There is workaround. What can you do is decides how many task2 instance
will be executed, and store it as Variables. You might be able to set
variables from the task1 instance execution.
I did something like this:
for i in range(0, int(Variable.get('weekly-mailer-worker', '5'))):
send_email = ExecuteFannedOutOperator(
task_id='execute-%s-%s' % (task_id, i),
dag=dag,
tolerance=100
)
start_weekly_mailer >> send_email
send_email >> post_execute
start-weekly-mailer will run the queries, and then send task definition to
queue (we used SQS). The execute-weekly-mailer will pull the task
definition from the queue, and will keep running until the queue is empty.
The post-execute mostly for cleanups.
I hope we can improve airflow to also handle this kind of uses case.
--
*Adinata*
Engineer - UrbanIndo.com
On Wed, Sep 14, 2016 at 7:42 AM, Jason Chen <[email protected]>
wrote:
> Hi airflow team,
>
> I have a data pipeline as below
>
> task1 --> task2(parameter) --> task3(parameter)
>
> The parameters of task2 and task3 are based on the outputs from task1.
> When it runs, the task1 will create a list of data, say list[a, b, c].
>
> Then, I want to process to run as like below. Is it possible to achieve in
> airflow ?
> Note the list output from task1 is dynamic.
> I am thinking to use SubDAG for task2 and task3, but it seems static.
> Thanks.
>
> Jason
>
> task1 --> task2(a) --> task3(a)
> \ \---> task2(b) --> task3(b)
> \-------> task2(c) --> task3(c)
>