I've done something similar. I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below. This runs in a for loop that creates one DAG
per iteration each with its own pool. Then I pass the pool name into the
sensors.
Does this work for your use case?
--
redshift_pool = PythonOperator(
task_id='redshift_pool',
dag=dag,
python_callable=ensure_redshift_pool,
op_kwargs={
'name': workflow.pool,
'slots': REDSHIFT_POOL_SLOTS,
},
...
)
@provide_session
def ensure_redshift_pool(name, slots, session=None):
pool = Pool(pool=name, slots=slots)
pool_query = (
session.query(Pool)
.filter(Pool.pool == name)
)
pool_query_result = pool_query.one_or_none()
if not pool_query_result:
logger.info(f'redshift pool "{name}" does not exist - creating it')
session.add(pool)
session.commit()
logger.info(f'created redshift pool "{name}"')
else:
logger.info(f'redshift pool "{name}" already exists')
--
*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>
On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <[email protected]>
wrote:
> Hi all,
>
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
>
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
>
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
>
>
>
>