[
https://issues.apache.org/jira/browse/AIRFLOW-3293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Darren Weber updated AIRFLOW-3293:
----------------------------------
Description:
The TimeDeltaSensor has baked-in lookups for the schedule and schedule_interval
lurking in the class init, it's not a pure time delta. It would be ideal to
have a TimeDelta that is purely relative to the time that an upstream task
triggers it. If there is a way to do this, please note it here or suggest some
implementation alternative that could achieve this easily.
The implementation below using a PythonOperator works, but it consumes a worker
for 5min needlessly. It would be much better to have a TimeDelta that accepts
the time when an upstream sensor triggers it and then waits for a timedelta,
with options from the base sensor for poke interval (and timeout). This could
be used without consuming a worker as much with the reschedule option.
Something like this can help with adding jitter to downstream tasks that could
otherwise hit an HTTP endpoint too hard all at once.
{code:python}
def wait5(*args, **kwargs):
import time as t
t.sleep(5 * 60)
return True
wait5_task = PythonOperator(
task_id="python_op_wait_5min",
python_callable=wait5,
dag=a_dag)
upstream_http_sensor >> wait5_task
{code}
was:
The TimeDeltaSensor has baked-in lookups for the schedule and schedule_interval
lurking in the class init, it's not a pure time delta. It would be ideal to
have a TimeDelta that is purely relative to the time that an upstream task
triggers it. If there is a way to do this, please note it here or suggest some
implementation alternative that could achieve this easily.
The implementation below using a PythonOperator works, but it consumes a worker
for 5min needlessly. It would be much better to have a TimeDelta that accepts
the time when an upstream sensor triggers it and then waits for a timedelta,
with options from the base sensor for poke interval (and timeout). This could
be used without consuming a worker as much with the reschedule option.
{code:python}
def wait5(*args, **kwargs):
import time as t
t.sleep(5 * 60)
return True
wait5_task = PythonOperator(
task_id="python_op_wait_5min",
python_callable=wait5,
dag=a_dag)
upstream_http_sensor >> wait5_task
{code}
> Rename TimeDeltaSensor to ScheduleTimeDeltaSensor
> -------------------------------------------------
>
> Key: AIRFLOW-3293
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3293
> Project: Apache Airflow
> Issue Type: Wish
> Reporter: Darren Weber
> Priority: Major
>
> The TimeDeltaSensor has baked-in lookups for the schedule and
> schedule_interval lurking in the class init, it's not a pure time delta. It
> would be ideal to have a TimeDelta that is purely relative to the time that
> an upstream task triggers it. If there is a way to do this, please note it
> here or suggest some implementation alternative that could achieve this
> easily.
> The implementation below using a PythonOperator works, but it consumes a
> worker for 5min needlessly. It would be much better to have a TimeDelta that
> accepts the time when an upstream sensor triggers it and then waits for a
> timedelta, with options from the base sensor for poke interval (and timeout).
> This could be used without consuming a worker as much with the reschedule
> option. Something like this can help with adding jitter to downstream tasks
> that could otherwise hit an HTTP endpoint too hard all at once.
> {code:python}
> def wait5(*args, **kwargs):
> import time as t
> t.sleep(5 * 60)
> return True
> wait5_task = PythonOperator(
> task_id="python_op_wait_5min",
> python_callable=wait5,
> dag=a_dag)
> upstream_http_sensor >> wait5_task
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)