zerodarkzone commented on issue #53591:
URL: https://github.com/apache/airflow/issues/53591#issuecomment-3114806342
This is a simple example on the documentation on how to implement a
deferrable operator:
``` python
import time
from datetime import timedelta
from typing import Any
from airflow.configuration import conf
from airflow.sdk import BaseSensorOperator
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self, deferrable: bool = conf.getboolean("operators",
"default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferrable
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete",
)
else:
time.sleep(3600)
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return
```
In this simple example, there's an else when checking the deferrable
attribute to behave as a sync operator when the value is False.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]