dstandish commented on code in PR #59115:
URL: https://github.com/apache/airflow/pull/59115#discussion_r2700221962
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
+ the first run of the DAG should be scheduled. It has no effect if there
+ already exist runs for this DAG.
+
+ * If *True*, always run immediately the most recent possible DAG run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible DAG run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self._run_offset = run_offset or 0
+ self._key_format = key_format
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.decoders import decode_run_immediately
+
+ offset = data["run_offset"]
+ if not isinstance(offset, (int, NoneType)):
+ offset = None
+ log.warning(
+ "Unexpected offset type on deserialization. Only int supported
in this version.",
+ run_offset=offset,
+ )
+
+ return cls(
+ cron=data["expression"],
+ timezone=parse_timezone(data["timezone"]),
+ run_offset=offset,
+ run_immediately=decode_run_immediately(data.get("run_immediately",
False)),
+ key_format=data["key_format"],
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_run_immediately,
encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "run_immediately": encode_run_immediately(self._run_immediately),
+ "run_offset": self._run_offset,
+ "key_format": self._key_format,
+ }
+
+ def get_partition_date(self, *, run_date):
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]