Chris, the tasks are independent of each other so they can run concurrently. I
have to limit the concurrency though, so they don’t starve. As the cluster is
created dynamically with a task, a shared pool with other DAGs or other runs of
the same DAG is not preferable.
I imagined something like this:
.——>
[compute_1] ——.
/ ——>
[compute_2] —— \
/
. \
[create_cluster] —> [create_pool_x6] .
[ delete_pool ] —> [delete cluster]
\
. /
\ ——>
[compute_19] —— /
. ——>
[compute_20] ——.
Thanks,
David
> On Sep 21, 2018, at 7:23 PM, Chris Palmer <[email protected]> wrote:
>
> What would cause multiple computation tasks to run on the cluster at the
> same time? Are you worried about concurrent DagRuns? Does setting dag
> concurrency and/or task concurrency appropriately solve your problem?
>
> Chris
>
> On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <[email protected]>
> wrote:
>
>> What I am doing is very similar. However I am including the DagRun's id in
>> the pool name to make it unique, as I need to make sure every run gets its
>> own pool. I am getting that from the context object, which is only
>> available within execute methods or templates. How do you make sure each
>> run has it's own pool?
>>
>>
>> Thanks,
>>
>> Dávid Szakállas
>> Software Engineer | Whitepages Data Services
>>
>> ________________________________
>> From: Taylor Edmiston <[email protected]>
>> Sent: Thursday, September 20, 2018 6:17:05 PM
>> To: [email protected]
>> Subject: Re: Creating dynamic pool from task
>>
>> I've done something similar. I have a task at the front of the DAG that
>> ensures the connection pool exists and creates the pool if it doesn't.
>> I've pasted my code below. This runs in a for loop that creates one DAG
>> per iteration each with its own pool. Then I pass the pool name into the
>> sensors.
>>
>> Does this work for your use case?
>>
>> --
>>
>> redshift_pool = PythonOperator(
>> task_id='redshift_pool',
>> dag=dag,
>> python_callable=ensure_redshift_pool,
>> op_kwargs={
>> 'name': workflow.pool,
>> 'slots': REDSHIFT_POOL_SLOTS,
>> },
>> ...
>> )
>>
>> @provide_session
>> def ensure_redshift_pool(name, slots, session=None):
>> pool = Pool(pool=name, slots=slots)
>> pool_query = (
>> session.query(Pool)
>> .filter(Pool.pool == name)
>> )
>> pool_query_result = pool_query.one_or_none()
>> if not pool_query_result:
>> logger.info(f'redshift pool "{name}" does not exist - creating
>> it')
>> session.add(pool)
>> session.commit()
>> logger.info(f'created redshift pool "{name}"')
>> else:
>> logger.info(f'redshift pool "{name}" already exists')
>>
>> --
>>
>> *Taylor Edmiston*
>> Blog <https://blog.tedmiston.com/> | LinkedIn
>> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
>> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
>> <https://stackoverflow.com/story/taylor>
>>
>>
>>
>> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
>> [email protected]>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a DAG that creates a cluster, starts computation tasks, and after
>>> they completed, tears down the cluster. I want to limit concurrency for
>> the
>>> computation tasks carried on this cluster to fixed number. So logically,
>> I
>>> need a pool that is exclusive to the cluster created by a task. I don't
>>> want interference with other DAGs or different runs of the same DAG.
>>>
>>> I thought I could solve this problem by creating a pool dynamically from
>> a
>>> task after the cluster is created and delete it once the computation
>> tasks
>>> are finished. I thought I could template the pool parameter of the
>>> computation tasks to make them use this dynamically created cluster.
>>>
>>> However this way the computation tasks will never be triggered. So I
>> think
>>> the pool parameter is saved in the task instance before being templated.
>> I
>>> would like to hear your thoughts on how to achieve the desired behavior.
>>>
>>> Thanks,
>>>
>>> Dávid Szakállas
>>> Software Engineer | Whitepages Data Services
>>>
>>>
>>>
>>>
>>>
>>