GitHub user Alexhans closed a discussion: Feedback on PeriodicTrigger (for 
custom sensors that want to support Deferred=True)

I'm seeking feedback before submitting a pull request on the potential addition 
of a trigger that fires on a recurring basis until the callback condition is 
met.

This would be fantastic for those looking to replace their custom sensors in 
reschedule mode with something that is fully handled by the trigger (Ideal for 
cases where the waits are long, such as polling for data completeness).

I can't find an obvious pattern with the existing triggers at the moment and 
adding deferrable support for a custom sensor requires a custom trigger as 
well, which seems like overkill for someone who had one or many 
`mode='reschedule'` sensors.

The code in question (fork): 
https://github.com/Alexhans/airflow/commit/57a0383d8a2e3a4332c79bcac3df100bde777472
 

This is example is periodic (fixed time) for simplicity but supporting backoff 
rates should be possible. Whether to make that into 2 different classes or a 
`PeriodicTrigger` being a specific version of `BackoffRateTrigger` is up to 
discussion.  Down the line, this could be implemented so that the base sensor 
itself has `deferrable=True` or `mode=deferrable` support (which should be 
incompatible with `mode=poke|reschedule`).

The trigger follows the rule of [no 
state](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#triggering-deferral)
 and being serializable but one of the main things to get feedback on is the 
serialization strategy of the callback object itself.  I looked into 
`airflow.jobs.triggerer_job_runner` code to see how the `deserialization` of 
the class instances was done: `TriggerRunner.update_triggers` calls 
[self.get_trigger_by_classpath](https://github.com/apache/airflow/blob/c72dad7eaf045c74b66a38de5cf5d899c7c5f6d8/airflow/jobs/triggerer_job_runner.py#L751C9-L751C33)
 which uses the util function `import_string` (and does caching).

Any thoughts?

- Besides the 2 unit tests, I've quickly tested this in a dev environment 
replacing my custom sensors and it behaves as expected.  Usage would look 
something like this:
```python
def some_checking_func(**kwargs) -> bool:
    hook = Myhook()
    return hook.some_checking_func(
        payload=kwargs["payload"],
        other_param=kwargs["other_param"],
    )

def get_objpath(python_object):
    return f"{python_object.__module__}.{python_object.__qualname__}"

class MySensor(BaseSensorOperator):
[...]
  self.defer(
              trigger=PeriodicTrigger(
                  interval_seconds=sensing_interval.total_seconds(),
                  callback_objpath=get_objpath(some_checking_func),
                  callback_kwargs={
                      "payload": self.payload,
                      "other_param": self.other_param,            
                  }
              ),
              method_name="execute_complete",
          )
```
- Using some helper function like `get_objpath` is just tidier and should 
probably be the standard to avoid having to rename classes.  
`get_objpath(self.class)` could easily be used instead of the static strings 
from temporal.  
- Should this go in temporal.py? Are the names right? Concerns with the 
de/serialization of the callback? Where should it happen instead?  


There seems to be no open related issues.  
I did read: [AIP-40: Deferrable ("Async") 
Operators](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050929)
 to get some context into the design.

A closed issue #41355 seemed to be the closest to something like this, talking 
about async and sensors.

GitHub link: https://github.com/apache/airflow/discussions/42645

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to