Lee-W commented on code in PR #59115:
URL: https://github.com/apache/airflow/pull/59115#discussion_r2697689941
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1765,24 +1767,48 @@ def _create_dag_runs(self, dag_models:
Collection[DagModel], session: Session) -
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we
don't attempt to create
# duplicate DagRuns
- existing_dagruns = (
- session.execute(
- select(DagRun.dag_id, DagRun.logical_date).where(
+ existing_dagrun_objects = (
+ session.scalars(
+ select(DagRun)
+ .where(
tuple_(DagRun.dag_id, DagRun.logical_date).in_(
(dm.dag_id, dm.next_dagrun) for dm in dag_models
- ),
+ )
)
+ .options(load_only(DagRun.dag_id, DagRun.logical_date))
)
.unique()
.all()
)
+ existing_dagruns = {(x.dag_id, x.logical_date): x for x in
existing_dagrun_objects}
Review Comment:
Why do we need it? I thought `unique()` already deduplicate Dag runs?
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -129,6 +130,39 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
)
+def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
+ """Build a select statement to retrieve the last automated run for each
dag."""
Review Comment:
```suggestion
"""Build a select statement to retrieve the last partitioned Dag run for
each Dag."""
```
##########
airflow-core/src/airflow/api/common/mark_tasks.py:
##########
@@ -70,6 +70,9 @@ def set_state(
:param commit: Commit tasks to be altered to the database
:param session: database session
:return: list of tasks that have been created and updated
+
+ TODO: "past" and "future" params currently depend on logical date, which
is not always populated.
Review Comment:
raise an warning and not doing anything for cases without logical_date maybe?
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
Review Comment:
```suggestion
Timetable that triggers Dag runs according to a cron expression.
```
##########
task-sdk/src/airflow/sdk/bases/timetable.py:
##########
@@ -47,6 +47,8 @@ class BaseTimetable:
asset_condition: BaseAsset | None = None
+ # TODO: AIP-76 just add partition-driven field here to differentiate the
behavior
Review Comment:
* partitioned
* is_partitioned
* partiion_driven
But even if we add such a field, we probably still need a base class, so we
don't need to set it to `True` every time? Then, probably using the class
itself to check is not a bad idea.
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
+ the first run of the DAG should be scheduled. It has no effect if there
+ already exist runs for this DAG.
+
+ * If *True*, always run immediately the most recent possible DAG run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible DAG run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self._run_offset = run_offset or 0
+ self._key_format = key_format
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.decoders import decode_run_immediately
+
+ offset = data["run_offset"]
+ if not isinstance(offset, (int, NoneType)):
+ offset = None
+ log.warning(
+ "Unexpected offset type on deserialization. Only int supported
in this version.",
+ run_offset=offset,
+ )
+
+ return cls(
+ cron=data["expression"],
+ timezone=parse_timezone(data["timezone"]),
+ run_offset=offset,
+ run_immediately=decode_run_immediately(data.get("run_immediately",
False)),
+ key_format=data["key_format"],
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_run_immediately,
encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "run_immediately": encode_run_immediately(self._run_immediately),
+ "run_offset": self._run_offset,
+ "key_format": self._key_format,
+ }
+
+ def get_partition_date(self, *, run_date):
+ if self._run_offset == 0:
+ return run_date
+ # we will need to apply offset to determine run date
+ partition_date = timezone.coerce_datetime(run_date)
+ log.info(
+ "applying offset to partition date",
partition_date=partition_date, run_offset=self._run_offset
+ )
+ iter_func = self._get_next if self._run_offset > 0 else self._get_prev
+ for _ in range(abs(self._run_offset)):
+ partition_date = iter_func(partition_date)
+ log.info("new partition date", partition_date=partition_date)
+ return partition_date
+
+ def next_dagrun_info_v2(
+ self,
+ *,
+ last_dagrun_info: DagRunInfo | None,
+ restriction: TimeRestriction,
+ ) -> DagRunInfo | None:
+ # todo: AIP-76 add test for this logic
+ # todo: AIP-76 we will have to ensure that the start / end times apply
to the partition date ideally,
+ # rather than just the run after
+
+ if restriction.catchup:
+ if last_dagrun_info is not None:
+ next_start_time = self._get_next(last_dagrun_info.run_after)
+ elif restriction.earliest is None:
+ next_start_time = self._calc_first_run()
+ else:
+ next_start_time = self._align_to_next(restriction.earliest)
+ else:
+ prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
+ start_time_candidates = [prev_candidate]
+ if last_dagrun_info is not None:
+ next_candidate = self._get_next(last_dagrun_info.run_after)
+ start_time_candidates.append(next_candidate)
+ elif restriction.earliest is None:
+ # Run immediately has no effect if there is restriction on
earliest
+ first_run = self._calc_first_run()
+ start_time_candidates.append(first_run)
+ if restriction.earliest is not None:
+ earliest = self._align_to_next(restriction.earliest)
+ start_time_candidates.append(earliest)
+ next_start_time = max(start_time_candidates)
+ if restriction.latest is not None and restriction.latest <
next_start_time:
+ return None
+
+ partition_date, partition_key =
self.get_partition_info(run_date=next_start_time)
+ return DagRunInfo(
+ run_after=next_start_time,
+ partition_date=partition_date,
+ partition_key=partition_key,
+ data_interval=None,
+ )
+
+ def get_partition_info(self, run_date):
Review Comment:
```suggestion
def get_partition_info(self, run_date: datetime) -> tuple[datetime, str]:
```
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -129,6 +130,39 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
)
+def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
Review Comment:
```suggestion
def _get_latest_partitioned_runs_stmt(dag_id: str) -> Select:
```
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
+ the first run of the DAG should be scheduled. It has no effect if there
+ already exist runs for this DAG.
+
+ * If *True*, always run immediately the most recent possible DAG run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible DAG run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self._run_offset = run_offset or 0
+ self._key_format = key_format
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.decoders import decode_run_immediately
+
+ offset = data["run_offset"]
+ if not isinstance(offset, (int, NoneType)):
+ offset = None
+ log.warning(
+ "Unexpected offset type on deserialization. Only int supported
in this version.",
+ run_offset=offset,
+ )
+
+ return cls(
+ cron=data["expression"],
+ timezone=parse_timezone(data["timezone"]),
+ run_offset=offset,
+ run_immediately=decode_run_immediately(data.get("run_immediately",
False)),
+ key_format=data["key_format"],
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_run_immediately,
encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "run_immediately": encode_run_immediately(self._run_immediately),
+ "run_offset": self._run_offset,
+ "key_format": self._key_format,
+ }
+
+ def get_partition_date(self, *, run_date):
Review Comment:
```suggestion
def get_partition_date(self, *, run_date) -> datetime:
```
##########
airflow-core/src/airflow/timetables/base.py:
##########
@@ -315,3 +329,24 @@ def generate_run_id(
:param data_interval: The data interval of the DAG run.
"""
return run_type.generate_run_id(suffix=run_after.isoformat())
+
+ def next_dagrun_info_v2(self, *, last_dagrun_info: DagRunInfo | None,
restriction: TimeRestriction):
+ """
+ Provide information to schedule the next DagRun.
+
+ The default implementation raises ``NotImplementedError``.
+
+ :param last_dagrun_info: The DagRunInfo object of the
+ Dag's last scheduled or backfilled run.
+ :param restriction: Restriction to apply when scheduling the DAG run.
+ See documentation of :class:`TimeRestriction` for details.
+
+ :return: Information on when the next DagRun can be scheduled. None
+ means a DagRun should not be created. This does not mean no more
runs
+ will be scheduled ever again for this DAG; the timetable can return
Review Comment:
```suggestion
will be scheduled ever again for this Dag; the timetable can
return
```
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -129,6 +130,39 @@ def _get_latest_runs_stmt(dag_id: str) -> Select:
)
+def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select:
+ """Build a select statement to retrieve the last automated run for each
dag."""
+ # todo: AIP-76 we should add a partition date field
+ latest_run_id = (
+ select(DagRun.id)
+ .where(
+ DagRun.dag_id == dag_id,
+ DagRun.run_type.in_(
+ (
+ DagRunType.BACKFILL_JOB,
+ DagRunType.SCHEDULED,
+ )
+ ),
+ DagRun.partition_key.is_not(None),
+ )
+ .order_by(DagRun.id.desc())
Review Comment:
Looks like we're using `DagRun.id` to decide the latest run?
##########
airflow-core/src/airflow/timetables/base.py:
##########
@@ -315,3 +329,24 @@ def generate_run_id(
:param data_interval: The data interval of the DAG run.
"""
return run_type.generate_run_id(suffix=run_after.isoformat())
+
+ def next_dagrun_info_v2(self, *, last_dagrun_info: DagRunInfo | None,
restriction: TimeRestriction):
+ """
+ Provide information to schedule the next DagRun.
+
+ The default implementation raises ``NotImplementedError``.
+
+ :param last_dagrun_info: The DagRunInfo object of the
+ Dag's last scheduled or backfilled run.
+ :param restriction: Restriction to apply when scheduling the DAG run.
Review Comment:
```suggestion
:param restriction: Restriction to apply when scheduling the Dag run.
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -702,36 +711,64 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase,
statuses: dict[UKey, bool]
triggered_date_by_dag,
)
+ def _get_run_info(self, run: DagRun | None, timetable: Timetable) ->
DagRunInfo | None:
+ run_info = None
+ interval = None
+ partition_date = None
+ if run:
+ run_after = timezone.coerce_datetime(run.run_after)
+ if not run.partition_key:
+ interval = get_run_data_interval(timetable, run)
+ if isinstance(timetable, CronPartitionTimetable):
+ # todo: AIP-76 store this on DagRun so we don't need to
recalculate?
+ # todo: AIP-76 this needs to be public
+ partition_date =
timetable.get_partition_date(run_date=run.run_after)
+ run_info = DagRunInfo(
+ run_after=run_after,
+ data_interval=interval,
+ partition_date=partition_date,
+ partition_key=run.partition_key,
+ )
+ return run_info
Review Comment:
```suggestion
def _get_run_info(self, run: DagRun | None, timetable: Timetable) ->
DagRunInfo | None:
if not run:
return None
interval = None
if not run.partition_key:
interval = get_run_data_interval(timetable, run)
partition_date = None
if isinstance(timetable, CronPartitionTimetable):
# todo: AIP-76 store this on DagRun so we don't need to
recalculate?
# todo: AIP-76 this needs to be public
partition_date =
timetable.get_partition_date(run_date=run.run_after)
return DagRunInfo(
run_after=timezone.coerce_datetime(run.run_after),
data_interval=interval,
partition_date=partition_date,
partition_key=run.partition_key,
)
```
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -140,6 +143,12 @@ def get_run_data_interval(timetable: Timetable, run:
DagRun) -> DataInterval:
:meta private:
"""
+ if not run:
+ return run
+
+ if run.partition_key is not None:
+ return None
Review Comment:
```suggestion
if not run or run.partition_key is not None:
return None
```
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -428,7 +433,11 @@ def _get_info_list(
) -> list[DagRunInfo]:
infos = dag.iter_dagrun_infos_between(from_date, to_date)
now = timezone.utcnow()
- dagrun_info_list = [x for x in infos if x.data_interval.end < now]
+ dagrun_info_list = []
+ for x in infos:
+ # todo: AIP-76 update for partitioned dags
+ if x.data_interval and x.data_interval.end < now:
+ dagrun_info_list.append(x)
Review Comment:
```suggestion
dagrun_info_list = [
x
for x in infos
# todo: AIP-76 update for partitioned dags
if x.data_interval and x.data_interval.end < now
]
```
comprehension still looks better IMO 🤔
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
Review Comment:
```suggestion
*run_immediately* controls, if no *start_time* is given to the Dag, when
```
##########
airflow-core/src/airflow/timetables/base.py:
##########
@@ -315,3 +329,24 @@ def generate_run_id(
:param data_interval: The data interval of the DAG run.
"""
return run_type.generate_run_id(suffix=run_after.isoformat())
+
+ def next_dagrun_info_v2(self, *, last_dagrun_info: DagRunInfo | None,
restriction: TimeRestriction):
Review Comment:
```suggestion
def next_dagrun_info_v2(self, *, last_dagrun_info: DagRunInfo | None,
restriction: TimeRestriction) -> DagRunInfo | None:
```
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
+ the first run of the DAG should be scheduled. It has no effect if there
Review Comment:
```suggestion
the first run of the Dag should be scheduled. It has no effect if there
```
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
+ the first run of the DAG should be scheduled. It has no effect if there
+ already exist runs for this DAG.
+
+ * If *True*, always run immediately the most recent possible DAG run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible DAG run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self._run_offset = run_offset or 0
+ self._key_format = key_format
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.decoders import decode_run_immediately
+
+ offset = data["run_offset"]
+ if not isinstance(offset, (int, NoneType)):
+ offset = None
+ log.warning(
+ "Unexpected offset type on deserialization. Only int supported
in this version.",
+ run_offset=offset,
+ )
+
+ return cls(
+ cron=data["expression"],
+ timezone=parse_timezone(data["timezone"]),
+ run_offset=offset,
+ run_immediately=decode_run_immediately(data.get("run_immediately",
False)),
+ key_format=data["key_format"],
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_run_immediately,
encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "run_immediately": encode_run_immediately(self._run_immediately),
+ "run_offset": self._run_offset,
+ "key_format": self._key_format,
+ }
+
+ def get_partition_date(self, *, run_date):
+ if self._run_offset == 0:
+ return run_date
+ # we will need to apply offset to determine run date
+ partition_date = timezone.coerce_datetime(run_date)
+ log.info(
+ "applying offset to partition date",
partition_date=partition_date, run_offset=self._run_offset
+ )
+ iter_func = self._get_next if self._run_offset > 0 else self._get_prev
+ for _ in range(abs(self._run_offset)):
+ partition_date = iter_func(partition_date)
+ log.info("new partition date", partition_date=partition_date)
+ return partition_date
+
+ def next_dagrun_info_v2(
+ self,
+ *,
+ last_dagrun_info: DagRunInfo | None,
+ restriction: TimeRestriction,
+ ) -> DagRunInfo | None:
+ # todo: AIP-76 add test for this logic
+ # todo: AIP-76 we will have to ensure that the start / end times apply
to the partition date ideally,
+ # rather than just the run after
+
+ if restriction.catchup:
+ if last_dagrun_info is not None:
+ next_start_time = self._get_next(last_dagrun_info.run_after)
+ elif restriction.earliest is None:
+ next_start_time = self._calc_first_run()
+ else:
+ next_start_time = self._align_to_next(restriction.earliest)
+ else:
+ prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
+ start_time_candidates = [prev_candidate]
+ if last_dagrun_info is not None:
+ next_candidate = self._get_next(last_dagrun_info.run_after)
+ start_time_candidates.append(next_candidate)
+ elif restriction.earliest is None:
+ # Run immediately has no effect if there is restriction on
earliest
+ first_run = self._calc_first_run()
+ start_time_candidates.append(first_run)
+ if restriction.earliest is not None:
+ earliest = self._align_to_next(restriction.earliest)
+ start_time_candidates.append(earliest)
+ next_start_time = max(start_time_candidates)
+ if restriction.latest is not None and restriction.latest <
next_start_time:
+ return None
+
+ partition_date, partition_key =
self.get_partition_info(run_date=next_start_time)
+ return DagRunInfo(
+ run_after=next_start_time,
+ partition_date=partition_date,
+ partition_key=partition_key,
+ data_interval=None,
+ )
+
+ def get_partition_info(self, run_date):
+ partition_date = self.get_partition_date(run_date=run_date)
+ partition_key = self._format_key(partition_date)
+ return partition_date, partition_key
+
+ def _format_key(self, partition_date: DateTime):
+ return partition_date.strftime(self._key_format)
+
+ def generate_run_id(
+ self,
+ *,
+ run_type: DagRunType,
+ run_after: DateTime,
+ data_interval: DataInterval | None,
+ **extra,
+ ) -> str:
+ partition_key = extra.get("partition_key")
+ components = [
+ run_after.isoformat(),
+ partition_key,
+ get_random_string(),
+ ]
Review Comment:
```suggestion
components = [
run_after.isoformat(),
extra.get("partition_key"),
get_random_string(),
]
```
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -338,8 +347,175 @@ def _dagrun_info_sort_key_no_catchup(info: DagRunInfo |
None, *, now: float) ->
order values by ``-logical_date`` if they are earlier than or at
current
time, but ``+logical_date`` if later.
"""
- if info is None:
+ if info is None or info.logical_date is None:
return math.inf
if (ts := info.logical_date.timestamp()) <= now:
return -ts
return ts
+
+
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the DAG, when
+ the first run of the DAG should be scheduled. It has no effect if there
+ already exist runs for this DAG.
+
+ * If *True*, always run immediately the most recent possible DAG run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible DAG run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self._run_offset = run_offset or 0
+ self._key_format = key_format
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.decoders import decode_run_immediately
+
+ offset = data["run_offset"]
+ if not isinstance(offset, (int, NoneType)):
+ offset = None
+ log.warning(
+ "Unexpected offset type on deserialization. Only int supported
in this version.",
+ run_offset=offset,
+ )
+
+ return cls(
+ cron=data["expression"],
+ timezone=parse_timezone(data["timezone"]),
+ run_offset=offset,
+ run_immediately=decode_run_immediately(data.get("run_immediately",
False)),
+ key_format=data["key_format"],
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_run_immediately,
encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "run_immediately": encode_run_immediately(self._run_immediately),
+ "run_offset": self._run_offset,
+ "key_format": self._key_format,
+ }
+
+ def get_partition_date(self, *, run_date):
+ if self._run_offset == 0:
+ return run_date
+ # we will need to apply offset to determine run date
+ partition_date = timezone.coerce_datetime(run_date)
+ log.info(
+ "applying offset to partition date",
partition_date=partition_date, run_offset=self._run_offset
+ )
+ iter_func = self._get_next if self._run_offset > 0 else self._get_prev
+ for _ in range(abs(self._run_offset)):
+ partition_date = iter_func(partition_date)
+ log.info("new partition date", partition_date=partition_date)
+ return partition_date
+
+ def next_dagrun_info_v2(
+ self,
+ *,
+ last_dagrun_info: DagRunInfo | None,
+ restriction: TimeRestriction,
+ ) -> DagRunInfo | None:
+ # todo: AIP-76 add test for this logic
+ # todo: AIP-76 we will have to ensure that the start / end times apply
to the partition date ideally,
+ # rather than just the run after
+
+ if restriction.catchup:
+ if last_dagrun_info is not None:
+ next_start_time = self._get_next(last_dagrun_info.run_after)
+ elif restriction.earliest is None:
+ next_start_time = self._calc_first_run()
+ else:
+ next_start_time = self._align_to_next(restriction.earliest)
+ else:
+ prev_candidate = self._align_to_prev(coerce_datetime(utcnow()))
+ start_time_candidates = [prev_candidate]
+ if last_dagrun_info is not None:
+ next_candidate = self._get_next(last_dagrun_info.run_after)
+ start_time_candidates.append(next_candidate)
+ elif restriction.earliest is None:
+ # Run immediately has no effect if there is restriction on
earliest
+ first_run = self._calc_first_run()
+ start_time_candidates.append(first_run)
+ if restriction.earliest is not None:
+ earliest = self._align_to_next(restriction.earliest)
+ start_time_candidates.append(earliest)
+ next_start_time = max(start_time_candidates)
+ if restriction.latest is not None and restriction.latest <
next_start_time:
+ return None
+
+ partition_date, partition_key =
self.get_partition_info(run_date=next_start_time)
+ return DagRunInfo(
+ run_after=next_start_time,
+ partition_date=partition_date,
+ partition_key=partition_key,
+ data_interval=None,
+ )
+
+ def get_partition_info(self, run_date):
+ partition_date = self.get_partition_date(run_date=run_date)
+ partition_key = self._format_key(partition_date)
+ return partition_date, partition_key
+
+ def _format_key(self, partition_date: DateTime):
Review Comment:
```suggestion
def _format_key(self, partition_date: DateTime) -> str:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]