Thanks Gerard,
Yea pools look really useful for limiting concurrent requests.
Where you mention the use of a hook would you simply raise an exception
from get_conn() should the adwords account be rate limited then just
configure a number of retries and appropriate delay / back off on the
operator doing work with the api?
I have come up with part of a solution using a key sensor and trigger dag
run. The idea would be that when my 'adwords_func' encounters a rate limit
error it sets a key in redis with an expiry matching the period in the rate
limit response then re-triggers the dag which will block on my sensor until
the key has expired.
The hard part is now getting this mechanism to work within a sub dag as I
have multiple api operations that need limiting.
def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries,
**kwargs):
dag = DAG(dag_id, **kwargs)
def count_retries(context, obj):
retries = context['dag_run'].conf.get('dag_retries', 1)
if retries > max_dag_retries:
raise SystemError("Max retries reached for dag")
obj.payload = {'dag_retries': retries + 1}
return obj
with dag:
RedisNoKeySensor(
task_id='check_for_rate_limit',
key='rate_limited',
redis_conn_id='redis_master',
poke_interval=10
) >> PythonOperator(
task_id=shift_callable.__name__,
python_callable=adwords_callable,
) >> TriggerDagRunOperator(
task_id='retry_dag_on_failure',
trigger_dag_id=dag_id,
trigger_rule=TriggerRule.ONE_FAILED,
python_callable=count_retries
)
return dag
Thanks for your help,
Rob
On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <[email protected]> wrote:
> Have you looked into pools? Pools allow you to specify how many tasks at
> any given time should use a common resource.
> That way you could limit this to 1, 2, or 3 for example. Pools are not
> dynamic however, so it only allows you to upper limit how many
> number of clients are going to hit the API at any moment, not determine how
> many when the rate limit is in effect
> (unless.... you use code to reconfigure the pool on demand, but I'm not
> sure if I should recommend that, i.e. reconfigure the # of clients
> on the basis of hitting the rate limit.) It sounds as if this logic is
> best introduced at the hook level, where it determines that it passes
> out an API interface only when the rate limit is not in place, where
> operators specify how many retries should occur.
>
> The Adwords API does allow increasing the rate limit threshold though and
> you're probably better off negotiating
> with Google to up that threshold, explaining your business case etc.?
>
> Gerard
>
>
>
> On Thu, Aug 9, 2018 at 10:43 AM [email protected] <[email protected]> wrote:
>
> > Hello,
> >
> > I am in the process of migrating a bespoke data pipe line built around
> > celery into airflow.
> >
> > We have a number of different tasks which interact with the Adwords API
> > which has a rate limiting policy. The policy isn't a fixed number of
> > requests its variable.
> >
> > In our celery code we have handled this by capturing a rate limit error
> > response and setting a key in redis to make sure that no tasks execute
> > against the API until it's expired. Any task that does get executed
> checks
> > for the presence of the key and if the key exists issues a retry for when
> > the rate limit is due to expire.
> >
> > Moving over to Airflow I can't find a way to go about scheduling a task
> to
> > retry in a specific amount of time. Doing some reading it seems a Sensor
> > could work to prevent other dags from executing whilst the rate limit is
> > present.
> >
> > I also can't seem to find an example of handling different exceptions
> from
> > a python task and adapting the retry logic accordingly.
> >
> > Any pointers would be much appreciated,
> >
> > Rob
> >
>