malthe opened a new pull request #18112:
URL: https://github.com/apache/airflow/pull/18112
Currently, a `TaskDeferred` takes a `method_name` parameter such as
"execute_complete" and this method must then exist on the operator.
```python
class MyOperator(BaseOperator):
def execute_complete(self, context, event=None):
"""Callback for when the trigger fires."""
```
But in some cases, it makes sense to simply restart the operator –
essentially calling its "execute" method. There is however an inconvenience or
compatibility issue here in that the resume method receives an "event" keyword
argument with the trigger event payload – and "execute" takes no such argument.
To appreciate why this is a problem, consider the problem of having to delay
the execution of an existing operator component, for example deferring its
execution for 10 minutes. We would like to avoid having to extend the
implementation of this operator in order to accomplish this.
In #17576, support for `pre_execute` was added – a hook that runs before the
operator's `execute` method. The change proposal presented here builds upon
this functionality to allow an implementation such as:
```python
class PreExecuteDefer:
def __init__(self, timedelta):
self._timedelta = timedelta
def __call__(self, context):
moment = context["data_interval_end"] + self._timedelta
# We must verify the trigger condition since upon re-entry, this
will be called again.
if moment < timezone.utcnow():
return
trigger = DateTimeTrigger(moment=moment)
raise TaskDeferred(trigger=trigger, kwargs={})
```
This can be used as the `pre_execute` hook to the effect of delaying the
execution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]