What I am doing is very similar. However I am including the DagRun's id in the 
pool name to make it unique, as I need to make sure every run gets its own 
pool. I am getting that from the context object, which is only available within 
execute methods or templates. How do you make sure each run has it's own pool?


Dávid Szakállas
Software Engineer | Whitepages Data Services

From: Taylor Edmiston <tedmis...@gmail.com>
Sent: Thursday, September 20, 2018 6:17:05 PM
To: dev@airflow.incubator.apache.org
Subject: Re: Creating dynamic pool from task

I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the

Does this work for your use case?


redshift_pool = PythonOperator(
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,

def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        .filter(Pool.pool == name)
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:
        logger.info(f'redshift pool "{name}" does not exist - creating it')
        logger.info(f'created redshift pool "{name}"')
        logger.info(f'redshift pool "{name}" already exists')


*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story

On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <dszakal...@whitepages.com>

> Hi all,
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
> Thanks,
> Dávid Szakállas
> Software Engineer | Whitepages Data Services

Reply via email to