Thanks Gerard,

Yea pools look really useful for limiting concurrent requests.

Where you mention the use of a hook would you simply raise an exception
from get_conn() should the adwords account be rate limited then just
configure a number of retries and appropriate delay / back off on the
operator doing work with the api?

I have come up with part of a solution using a key sensor and trigger dag
run. The idea would be that when my 'adwords_func' encounters a rate limit
error it sets a key in redis with an expiry matching the period in the rate
limit response then re-triggers the dag which will block on my sensor until
the key has expired.

The hard part is now getting this mechanism to work within a sub dag as I
have multiple api operations that need limiting.

def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries,
**kwargs):
    dag = DAG(dag_id, **kwargs)

    def count_retries(context, obj):
        retries = context['dag_run'].conf.get('dag_retries', 1)

        if retries > max_dag_retries:
            raise SystemError("Max retries reached for dag")

        obj.payload = {'dag_retries': retries + 1}

        return obj

    with dag:
        RedisNoKeySensor(
            task_id='check_for_rate_limit',
            key='rate_limited',
            redis_conn_id='redis_master',
            poke_interval=10
        ) >> PythonOperator(
            task_id=shift_callable.__name__,
            python_callable=adwords_callable,
        ) >> TriggerDagRunOperator(
            task_id='retry_dag_on_failure',
            trigger_dag_id=dag_id,
            trigger_rule=TriggerRule.ONE_FAILED,
            python_callable=count_retries
        )

    return dag

Thanks for your help,

Rob


On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gtoons...@gmail.com> wrote:

> Have you looked into pools?  Pools allow you to specify how many tasks at
> any given time should use a common resource.
> That way you could limit this to 1, 2, or 3 for example. Pools are not
> dynamic however, so it only allows you to upper limit how many
> number of clients are going to hit the API at any moment, not determine how
> many when the rate limit is in effect
> (unless.... you use code to reconfigure the pool on demand, but I'm not
> sure if I should recommend that, i.e. reconfigure the # of clients
> on the basis of hitting the rate limit.)  It sounds as if this logic is
> best introduced at the hook level, where it determines that it passes
> out an API interface only when the rate limit is not in place, where
> operators specify how many retries should occur.
>
> The Adwords API does allow increasing the rate limit threshold though and
> you're probably better off negotiating
> with Google to up that threshold, explaining your business case etc.?
>
> Gerard
>
>
>
> On Thu, Aug 9, 2018 at 10:43 AM r...@goshift.com <r...@goshift.com> wrote:
>
> > Hello,
> >
> > I am in the process of migrating a bespoke data pipe line built around
> > celery into airflow.
> >
> > We have a number of different tasks which interact with the Adwords API
> > which has a rate limiting policy. The policy isn't a fixed number of
> > requests its variable.
> >
> > In our celery code we have handled this by capturing a rate limit error
> > response and setting a key in redis to make sure that no tasks execute
> > against the API until it's expired. Any task that does get executed
> checks
> > for the presence of the key and if the key exists issues a retry for when
> > the rate limit is due to expire.
> >
> > Moving over to Airflow I can't find a way to go about scheduling a task
> to
> > retry in a specific amount of time. Doing some reading it seems a Sensor
> > could work to prevent other dags from executing whilst the rate limit is
> > present.
> >
> > I also can't seem to find an example of handling different exceptions
> from
> > a python task and adapting the retry logic accordingly.
> >
> > Any pointers would be much appreciated,
> >
> > Rob
> >
>

Reply via email to