Thanks Gerard, that's really helpful it would have taken me some time to
pinpoint that race condition.

I will go with your suggestion and implement a hook and manage the logic
within the operator its self,

Rob

On Sun, Aug 12, 2018 at 9:28 AM, Gerard Toonstra <gtoons...@gmail.com>
wrote:

> This is worth a design discussion in its own right, but here's my input.
>
> You're using a DAG with sensor operators to determine if something needs to
> be triggered.
> There is a time between the sensor "ok-ing" the progression and the dag
> being triggered and the
> first task being spun up. This interval can easily lead to a race
> conditions where another sensor elsewhere still
> sees a non-rate limited condition and may also initiate the dag. It's
> likely a rate limit will result from that.
>
> Second, should there still be a rate limit in effect, then the operators in
> the DAG won't respect the back-off
> period from there, because you passed that check already.
>
>
> For that reason I'd do this slightly differently in a more managed way. I
> don't have sufficient background with
> the business requirements and how many adwords related work there is in
> total, but here are three options to look into:
>
> - Make the adwords hook, which raises RateLimitException for example, then
> let the operator respond to that and
>   manage redis. FIrst check with redis when it starts, then call adwords
> and in case of failure, update redis and probably
>   go into the retry loop. You can set a low interval here, because it will
> check with redis anyway, that way you can support
>   back off periods of any resolution.
>
> - Just use a pool with the number of required simultaneous processes and
> play with the variables and rates to avoid the
>   rate limit in the first place. That way, you can maximize the API usage
> without creating a stampeding herd that will probably
>   lead to failure anyway.
>
> - There's another approach thinkable where a dag "requests" the use of the
> API by inserting a record in a queue in redis,
>   where the main dag does the actual triggering (so that all scheduling is
> centralized), but that's like building a scheduler in a
>   scheduler and in the end, a pool would give you the same functionality
> without all the hassle.
>
> Rgds,
>
> Gerard
>
> On Fri, Aug 10, 2018 at 12:41 PM Robin Edwards <r...@goshift.com> wrote:
>
> > Thanks Gerard,
> >
> > Yea pools look really useful for limiting concurrent requests.
> >
> > Where you mention the use of a hook would you simply raise an exception
> > from get_conn() should the adwords account be rate limited then just
> > configure a number of retries and appropriate delay / back off on the
> > operator doing work with the api?
> >
> > I have come up with part of a solution using a key sensor and trigger dag
> > run. The idea would be that when my 'adwords_func' encounters a rate
> limit
> > error it sets a key in redis with an expiry matching the period in the
> rate
> > limit response then re-triggers the dag which will block on my sensor
> until
> > the key has expired.
> >
> > The hard part is now getting this mechanism to work within a sub dag as I
> > have multiple api operations that need limiting.
> >
> > def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries,
> > **kwargs):
> >     dag = DAG(dag_id, **kwargs)
> >
> >     def count_retries(context, obj):
> >         retries = context['dag_run'].conf.get('dag_retries', 1)
> >
> >         if retries > max_dag_retries:
> >             raise SystemError("Max retries reached for dag")
> >
> >         obj.payload = {'dag_retries': retries + 1}
> >
> >         return obj
> >
> >     with dag:
> >         RedisNoKeySensor(
> >             task_id='check_for_rate_limit',
> >             key='rate_limited',
> >             redis_conn_id='redis_master',
> >             poke_interval=10
> >         ) >> PythonOperator(
> >             task_id=shift_callable.__name__,
> >             python_callable=adwords_callable,
> >         ) >> TriggerDagRunOperator(
> >             task_id='retry_dag_on_failure',
> >             trigger_dag_id=dag_id,
> >             trigger_rule=TriggerRule.ONE_FAILED,
> >             python_callable=count_retries
> >         )
> >
> >     return dag
> >
> > Thanks for your help,
> >
> > Rob
> >
> >
> > On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gtoons...@gmail.com>
> > wrote:
> >
> > > Have you looked into pools?  Pools allow you to specify how many tasks
> at
> > > any given time should use a common resource.
> > > That way you could limit this to 1, 2, or 3 for example. Pools are not
> > > dynamic however, so it only allows you to upper limit how many
> > > number of clients are going to hit the API at any moment, not determine
> > how
> > > many when the rate limit is in effect
> > > (unless.... you use code to reconfigure the pool on demand, but I'm not
> > > sure if I should recommend that, i.e. reconfigure the # of clients
> > > on the basis of hitting the rate limit.)  It sounds as if this logic is
> > > best introduced at the hook level, where it determines that it passes
> > > out an API interface only when the rate limit is not in place, where
> > > operators specify how many retries should occur.
> > >
> > > The Adwords API does allow increasing the rate limit threshold though
> and
> > > you're probably better off negotiating
> > > with Google to up that threshold, explaining your business case etc.?
> > >
> > > Gerard
> > >
> > >
> > >
> > > On Thu, Aug 9, 2018 at 10:43 AM r...@goshift.com <r...@goshift.com>
> wrote:
> > >
> > > > Hello,
> > > >
> > > > I am in the process of migrating a bespoke data pipe line built
> around
> > > > celery into airflow.
> > > >
> > > > We have a number of different tasks which interact with the Adwords
> API
> > > > which has a rate limiting policy. The policy isn't a fixed number of
> > > > requests its variable.
> > > >
> > > > In our celery code we have handled this by capturing a rate limit
> error
> > > > response and setting a key in redis to make sure that no tasks
> execute
> > > > against the API until it's expired. Any task that does get executed
> > > checks
> > > > for the presence of the key and if the key exists issues a retry for
> > when
> > > > the rate limit is due to expire.
> > > >
> > > > Moving over to Airflow I can't find a way to go about scheduling a
> task
> > > to
> > > > retry in a specific amount of time. Doing some reading it seems a
> > Sensor
> > > > could work to prevent other dags from executing whilst the rate limit
> > is
> > > > present.
> > > >
> > > > I also can't seem to find an example of handling different exceptions
> > > from
> > > > a python task and adapting the retry logic accordingly.
> > > >
> > > > Any pointers would be much appreciated,
> > > >
> > > > Rob
> > > >
> > >
> >
>

Reply via email to