GitHub user Alexhans closed a discussion: Feedback on PeriodicTrigger (for custom sensors that want to support Deferred=True)
I'm seeking feedback before submitting a pull request on the potential addition of a trigger that fires on a recurring basis until the callback condition is met. This would be fantastic for those looking to replace their custom sensors in reschedule mode with something that is fully handled by the trigger (Ideal for cases where the waits are long, such as polling for data completeness). I can't find an obvious pattern with the existing triggers at the moment and adding deferrable support for a custom sensor requires a custom trigger as well, which seems like overkill for someone who had one or many `mode='reschedule'` sensors. The code in question (fork): https://github.com/Alexhans/airflow/commit/57a0383d8a2e3a4332c79bcac3df100bde777472 This is example is periodic (fixed time) for simplicity but supporting backoff rates should be possible. Whether to make that into 2 different classes or a `PeriodicTrigger` being a specific version of `BackoffRateTrigger` is up to discussion. Down the line, this could be implemented so that the base sensor itself has `deferrable=True` or `mode=deferrable` support (which should be incompatible with `mode=poke|reschedule`). The trigger follows the rule of [no state](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#triggering-deferral) and being serializable but one of the main things to get feedback on is the serialization strategy of the callback object itself. I looked into `airflow.jobs.triggerer_job_runner` code to see how the `deserialization` of the class instances was done: `TriggerRunner.update_triggers` calls [self.get_trigger_by_classpath](https://github.com/apache/airflow/blob/c72dad7eaf045c74b66a38de5cf5d899c7c5f6d8/airflow/jobs/triggerer_job_runner.py#L751C9-L751C33) which uses the util function `import_string` (and does caching). Any thoughts? - Besides the 2 unit tests, I've quickly tested this in a dev environment replacing my custom sensors and it behaves as expected. Usage would look something like this: ```python def some_checking_func(**kwargs) -> bool: hook = Myhook() return hook.some_checking_func( payload=kwargs["payload"], other_param=kwargs["other_param"], ) def get_objpath(python_object): return f"{python_object.__module__}.{python_object.__qualname__}" class MySensor(BaseSensorOperator): [...] self.defer( trigger=PeriodicTrigger( interval_seconds=sensing_interval.total_seconds(), callback_objpath=get_objpath(some_checking_func), callback_kwargs={ "payload": self.payload, "other_param": self.other_param, } ), method_name="execute_complete", ) ``` - Using some helper function like `get_objpath` is just tidier and should probably be the standard to avoid having to rename classes. `get_objpath(self.class)` could easily be used instead of the static strings from temporal. - Should this go in temporal.py? Are the names right? Concerns with the de/serialization of the callback? Where should it happen instead? There seems to be no open related issues. I did read: [AIP-40: Deferrable ("Async") Operators](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050929) to get some context into the design. A closed issue #41355 seemed to be the closest to something like this, talking about async and sensors. GitHub link: https://github.com/apache/airflow/discussions/42645 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
