This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fdf9e199e68 feat(sdk): add CronPartitionTimetable to task-sdk (#61247)
fdf9e199e68 is described below
commit fdf9e199e688a5ebdacfa11a94d9d1ca93f9379d
Author: Wei Lee <[email protected]>
AuthorDate: Sat Jan 31 01:24:02 2026 +0800
feat(sdk): add CronPartitionTimetable to task-sdk (#61247)
---
airflow-core/src/airflow/serialization/encoders.py | 12 ++++
airflow-core/src/airflow/timetables/trigger.py | 4 +-
airflow-core/tests/unit/models/test_dag.py | 11 +++-
.../unit/timetables/test_trigger_timetable.py | 2 +-
task-sdk/docs/api.rst | 6 +-
task-sdk/src/airflow/sdk/__init__.py | 3 +
task-sdk/src/airflow/sdk/__init__.pyi | 2 +
.../airflow/sdk/definitions/timetables/trigger.py | 69 +++++++++++++++++++++-
8 files changed, 100 insertions(+), 9 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 404dca52659..d63327d8094 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -49,6 +49,7 @@ from airflow.sdk.definitions.timetables.assets import (
PartitionedAssetTimetable,
)
from airflow.sdk.definitions.timetables.simple import ContinuousTimetable,
NullTimetable, OnceTimetable
+from airflow.sdk.definitions.timetables.trigger import CronPartitionTimetable
from airflow.serialization.decoders import decode_deadline_alert
from airflow.serialization.definitions.assets import (
SerializedAsset,
@@ -242,6 +243,7 @@ class _Serializer:
ContinuousTimetable: "airflow.timetables.simple.ContinuousTimetable",
CronDataIntervalTimetable:
"airflow.timetables.interval.CronDataIntervalTimetable",
CronTriggerTimetable:
"airflow.timetables.trigger.CronTriggerTimetable",
+ CronPartitionTimetable:
"airflow.timetables.trigger.CronPartitionTimetable",
DeltaDataIntervalTimetable:
"airflow.timetables.interval.DeltaDataIntervalTimetable",
DeltaTriggerTimetable:
"airflow.timetables.trigger.DeltaTriggerTimetable",
EventsTimetable: "airflow.timetables.events.EventsTimetable",
@@ -305,6 +307,16 @@ class _Serializer:
"run_immediately":
encode_run_immediately(timetable.run_immediately),
}
+ @serialize_timetable.register
+ def _(self, timetable: CronPartitionTimetable) -> dict[str, Any]:
+ return {
+ "expression": timetable.expression,
+ "timezone": encode_timezone(timetable.timezone),
+ "run_immediately":
encode_run_immediately(timetable.run_immediately),
+ "run_offset": timetable.run_offset,
+ "key_format": timetable.key_format,
+ }
+
@serialize_timetable.register
def _(self, timetable: DeltaTriggerTimetable) -> dict[str, Any]:
return {
diff --git a/airflow-core/src/airflow/timetables/trigger.py
b/airflow-core/src/airflow/timetables/trigger.py
index af4f855f0cd..854415e5722 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -242,10 +242,10 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable):
class MultipleCronTriggerTimetable(Timetable):
"""
- Timetable that triggers DAG runs according to multiple cron expressions.
+ Timetable that triggers Dag runs according to multiple cron expressions.
This combines multiple ``CronTriggerTimetable`` instances underneath, and
- triggers a DAG run whenever one of the timetables want to trigger a run.
+ triggers a Dag run whenever one of the timetables want to trigger a run.
Only at most one run is triggered for any given time, even if more than one
timetable fires at the same time.
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index e5366fc6127..2a45735847e 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -65,7 +65,15 @@ from airflow.models.taskinstance import TaskInstance as TI
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk import DAG, BaseOperator, TaskGroup, setup, task as
task_decorator, teardown
+from airflow.sdk import (
+ DAG,
+ BaseOperator,
+ CronPartitionTimetable,
+ TaskGroup,
+ setup,
+ task as task_decorator,
+ teardown,
+)
from airflow.sdk.definitions._internal.contextmanager import TaskGroupContext
from airflow.sdk.definitions._internal.templater import NativeEnvironment,
SandboxedEnvironment
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny
@@ -82,7 +90,6 @@ from airflow.timetables.simple import (
NullTimetable,
OnceTimetable,
)
-from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index a5774bb22c5..8cd895c7dc1 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -28,9 +28,9 @@ from sqlalchemy import select
from airflow._shared.timezones.timezone import utc
from airflow.exceptions import AirflowTimetableInvalid
from airflow.models import DagModel
+from airflow.sdk import CronPartitionTimetable
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
from airflow.timetables.trigger import (
- CronPartitionTimetable,
CronTriggerTimetable,
DeltaTriggerTimetable,
MultipleCronTriggerTimetable,
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 4bb0cd4d2ea..55d5b415253 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -175,18 +175,20 @@ Timetables
.. autoapiclass:: airflow.sdk.CronTriggerTimetable
+.. autoapiclass:: airflow.sdk.CronPartitionTimetable
+
.. autoapiclass:: airflow.sdk.DeltaDataIntervalTimetable
.. autoapiclass:: airflow.sdk.DeltaTriggerTimetable
.. autoapiclass:: airflow.sdk.EventsTimetable
+.. autoapiclass:: airflow.sdk.IdentityMapper
+
.. autoapiclass:: airflow.sdk.MultipleCronTriggerTimetable
.. autoapiclass:: airflow.sdk.PartitionMapper
-.. autoapiclass:: airflow.sdk.IdentityMapper
-
.. autoapiclass:: airflow.sdk.PartitionedAssetTimetable
diff --git a/task-sdk/src/airflow/sdk/__init__.py
b/task-sdk/src/airflow/sdk/__init__.py
index a530dadafd6..c8067c9ea91 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -37,6 +37,7 @@ __all__ = [
"Context",
"CronDataIntervalTimetable",
"CronTriggerTimetable",
+ "CronPartitionTimetable",
"DAG",
"DagRunState",
"DeadlineAlert",
@@ -121,6 +122,7 @@ if TYPE_CHECKING:
DeltaDataIntervalTimetable,
)
from airflow.sdk.definitions.timetables.trigger import (
+ CronPartitionTimetable,
CronTriggerTimetable,
DeltaTriggerTimetable,
MultipleCronTriggerTimetable,
@@ -151,6 +153,7 @@ __lazy_imports: dict[str, str] = {
"Context": ".definitions.context",
"CronDataIntervalTimetable": ".definitions.timetables.interval",
"CronTriggerTimetable": ".definitions.timetables.trigger",
+ "CronPartitionTimetable": ".definitions.timetables.trigger",
"DAG": ".definitions.dag",
"DagRunState": ".api.datamodels._generated",
"DeadlineAlert": ".definitions.deadline",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi
b/task-sdk/src/airflow/sdk/__init__.pyi
index 21315fa231d..ed55a56a80a 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -70,6 +70,7 @@ from airflow.sdk.definitions.timetables.interval import (
DeltaDataIntervalTimetable,
)
from airflow.sdk.definitions.timetables.trigger import (
+ CronPartitionTimetable,
CronTriggerTimetable,
DeltaTriggerTimetable,
MultipleCronTriggerTimetable,
@@ -100,6 +101,7 @@ __all__ = [
"Context",
"CronDataIntervalTimetable",
"CronTriggerTimetable",
+ "CronPartitionTimetable",
"DAG",
"DagRunState",
"DeltaDataIntervalTimetable",
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index afde7e2cfd1..4451ab07461 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import datetime
+from types import NoneType
from typing import TYPE_CHECKING
import attrs
@@ -88,10 +89,10 @@ class CronTriggerTimetable(CronMixin, BaseTimetable):
@attrs.define(init=False)
class MultipleCronTriggerTimetable(BaseTimetable):
"""
- Timetable that triggers DAG runs according to multiple cron expressions.
+ Timetable that triggers Dag runs according to multiple cron expressions.
This combines multiple ``CronTriggerTimetable`` instances underneath, and
- triggers a DAG run whenever one of the timetables want to trigger a run.
+ triggers a Dag run whenever one of the timetables want to trigger a run.
Only at most one run is triggered for any given time, even if more than one
timetable fires at the same time.
@@ -114,3 +115,67 @@ class MultipleCronTriggerTimetable(BaseTimetable):
for cron in crons
],
)
+
+
[email protected]
+class CronPartitionTimetable(CronTriggerTimetable):
+ """
+ Timetable that triggers Dag runs according to a cron expression.
+
+ Creates runs for partition keys.
+
+ The cron expression determines the sequence of run dates. And
+ the partition dates are derived from those according to the ``run_offset``.
+ The partition key is then formatted using the partition date.
+
+ A ``run_offset`` of 1 means the partition_date will be one cron interval
+ after the run date; negative means the partition date will be one cron
+ interval prior to the run date.
+
+ :param cron: cron string that defines when to run
+ :param timezone: Which timezone to use to interpret the cron string
+ :param run_offset: Integer offset that determines which partition date to
run for.
+ The partition key will be derived from the partition date.
+ :param key_format: How to translate the partition date into a string
partition key.
+
+ *run_immediately* controls, if no *start_time* is given to the Dag, when
+ the first run of the Dag should be scheduled. It has no effect if there
already exist runs for this Dag.
+
+ * If *True*, always run immediately the most recent possible Dag run.
+ * If *False*, wait to run until the next scheduled time in the future.
+ * If passed a ``timedelta``, will run the most recent possible Dag run
+ if that run's ``data_interval_end`` is within timedelta of now.
+ * If *None*, the timedelta is calculated as 10% of the time between the
+ most recent past scheduled time and the next scheduled time. E.g. if
+ running every hour, this would run the previous time if less than 6
+ minutes had past since the previous run time, otherwise it would wait
+ until the next hour.
+
+ # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+ # todo: AIP-76 we could allow a tuple of integer + time-based
+
+ """
+
+ run_offset: int | datetime.timedelta | relativedelta | None = None
+ key_format: str = "%Y-%m-%dT%H:%M:%S" # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ run_offset: int | datetime.timedelta | relativedelta | None = None,
+ run_immediately: bool | datetime.timedelta = False,
+ key_format: str = "%Y-%m-%dT%H:%M:%S", # todo: AIP-76 we can't infer
partition date from this, so we need to store it separately
+ ) -> None:
+ # super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
+ if not isinstance(run_offset, (int, NoneType)):
+ # todo: AIP-76 implement timedelta / relative delta?
+ raise ValueError("Run offset other than integer not supported
yet.")
+ self.__attrs_init__( # type: ignore[attr-defined]
+ cron,
+ timezone=timezone,
+ run_offset=run_offset,
+ run_immediately=run_immediately,
+ key_format=key_format,
+ )