Inspired by James, I tried this out...
For others interested, here is sample dag to test it out:
class MyDAG(DAG):
def following_schedule(self, dttm):
pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
minutes = pen_dt.minute
minutes_mod = minutes % 10
if minutes_mod < 5:
return pen_dt.add(minutes=1)
else:
return pen_dt.add(minutes=10 - minutes_mod)
def previous_schedule(self, dttm):
pen_dt = pendulum.instance(dttm).replace(second=0, microsecond=0)
minutes = pen_dt.minute
minutes_mod = minutes % 10
if minutes_mod < 5:
return pen_dt.add(minutes=-1)
else:
return pen_dt.add(minutes=-(minutes_mod - 5))
dag = MyDAG(
dag_id='test_schd',
default_args=default_args,
schedule_interval='@daily',
catchup=True,
concurrency=1000,
max_active_runs=10,
)
with dag:
DummyOperator(task_id='test', task_concurrency=1000)
What this will do is trigger one run for every minute when minutes (mod 10)
is between 0-4 but not schedule anything between 5-9 *(or something like
that, i did not scrutinize the edges carefully)*.
But... anyway, it'll prove that it works relatively quickly and that's the
point.
I have a use case. I think i might use it, rather than adding branching
logic. It's ugly, but they are both ugly.
*Question*
What do people think about a Schedule abstraction that takes the
previous_schedule and following_schedule methods from dag (perhaps rename
to previous_execution following_execution?)
Then I imagine we could do this:
* deprecate the `schedule_interval` param
* rename schedule_interval to ``schedule: Union[Schedule, str, timedelta]``
and preserve backward compatibility
* if str or timedelta is given, we instantiate a suitable Schedule object.
Perhaps there is a CronSchedule and a TimedeltaSchedule.
Any interest?
Ash, you had mentioned something about some plans that were in conflict
with the above hack.... could you maybe share a thought or two about what
you were thinking?
*Another idea*
If we could maybe leave it to the `Schedule` class to decide the
relationship between run time and "execution_date". There is the "interval
edge PR"... But maybe there would be an elegant way to control that
behavior in the Schedule class. Perhaps simply a class constant, or param.