This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fdf9e199e68 feat(sdk): add CronPartitionTimetable to task-sdk (#61247)
fdf9e199e68 is described below

commit fdf9e199e688a5ebdacfa11a94d9d1ca93f9379d
Author: Wei Lee <[email protected]>
AuthorDate: Sat Jan 31 01:24:02 2026 +0800

    feat(sdk): add CronPartitionTimetable to task-sdk (#61247)
---
 airflow-core/src/airflow/serialization/encoders.py | 12 ++++
 airflow-core/src/airflow/timetables/trigger.py     |  4 +-
 airflow-core/tests/unit/models/test_dag.py         | 11 +++-
 .../unit/timetables/test_trigger_timetable.py      |  2 +-
 task-sdk/docs/api.rst                              |  6 +-
 task-sdk/src/airflow/sdk/__init__.py               |  3 +
 task-sdk/src/airflow/sdk/__init__.pyi              |  2 +
 .../airflow/sdk/definitions/timetables/trigger.py  | 69 +++++++++++++++++++++-
 8 files changed, 100 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 404dca52659..d63327d8094 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -49,6 +49,7 @@ from airflow.sdk.definitions.timetables.assets import (
     PartitionedAssetTimetable,
 )
 from airflow.sdk.definitions.timetables.simple import ContinuousTimetable, 
NullTimetable, OnceTimetable
+from airflow.sdk.definitions.timetables.trigger import CronPartitionTimetable
 from airflow.serialization.decoders import decode_deadline_alert
 from airflow.serialization.definitions.assets import (
     SerializedAsset,
@@ -242,6 +243,7 @@ class _Serializer:
         ContinuousTimetable: "airflow.timetables.simple.ContinuousTimetable",
         CronDataIntervalTimetable: 
"airflow.timetables.interval.CronDataIntervalTimetable",
         CronTriggerTimetable: 
"airflow.timetables.trigger.CronTriggerTimetable",
+        CronPartitionTimetable: 
"airflow.timetables.trigger.CronPartitionTimetable",
         DeltaDataIntervalTimetable: 
"airflow.timetables.interval.DeltaDataIntervalTimetable",
         DeltaTriggerTimetable: 
"airflow.timetables.trigger.DeltaTriggerTimetable",
         EventsTimetable: "airflow.timetables.events.EventsTimetable",
@@ -305,6 +307,16 @@ class _Serializer:
             "run_immediately": 
encode_run_immediately(timetable.run_immediately),
         }
 
+    @serialize_timetable.register
+    def _(self, timetable: CronPartitionTimetable) -> dict[str, Any]:
+        return {
+            "expression": timetable.expression,
+            "timezone": encode_timezone(timetable.timezone),
+            "run_immediately": 
encode_run_immediately(timetable.run_immediately),
+            "run_offset": timetable.run_offset,
+            "key_format": timetable.key_format,
+        }
+
     @serialize_timetable.register
     def _(self, timetable: DeltaTriggerTimetable) -> dict[str, Any]:
         return {
diff --git a/airflow-core/src/airflow/timetables/trigger.py 
b/airflow-core/src/airflow/timetables/trigger.py
index af4f855f0cd..854415e5722 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -242,10 +242,10 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable):
 
 class MultipleCronTriggerTimetable(Timetable):
     """
-    Timetable that triggers DAG runs according to multiple cron expressions.
+    Timetable that triggers Dag runs according to multiple cron expressions.
 
     This combines multiple ``CronTriggerTimetable`` instances underneath, and
-    triggers a DAG run whenever one of the timetables want to trigger a run.
+    triggers a Dag run whenever one of the timetables want to trigger a run.
 
     Only at most one run is triggered for any given time, even if more than one
     timetable fires at the same time.
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index e5366fc6127..2a45735847e 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -65,7 +65,15 @@ from airflow.models.taskinstance import TaskInstance as TI
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk import DAG, BaseOperator, TaskGroup, setup, task as 
task_decorator, teardown
+from airflow.sdk import (
+    DAG,
+    BaseOperator,
+    CronPartitionTimetable,
+    TaskGroup,
+    setup,
+    task as task_decorator,
+    teardown,
+)
 from airflow.sdk.definitions._internal.contextmanager import TaskGroupContext
 from airflow.sdk.definitions._internal.templater import NativeEnvironment, 
SandboxedEnvironment
 from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny
@@ -82,7 +90,6 @@ from airflow.timetables.simple import (
     NullTimetable,
     OnceTimetable,
 )
-from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.file import list_py_file_paths
 from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py 
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index a5774bb22c5..8cd895c7dc1 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -28,9 +28,9 @@ from sqlalchemy import select
 from airflow._shared.timezones.timezone import utc
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.models import DagModel
+from airflow.sdk import CronPartitionTimetable
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.trigger import (
-    CronPartitionTimetable,
     CronTriggerTimetable,
     DeltaTriggerTimetable,
     MultipleCronTriggerTimetable,
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 4bb0cd4d2ea..55d5b415253 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -175,18 +175,20 @@ Timetables
 
 .. autoapiclass:: airflow.sdk.CronTriggerTimetable
 
+.. autoapiclass:: airflow.sdk.CronPartitionTimetable
+
 .. autoapiclass:: airflow.sdk.DeltaDataIntervalTimetable
 
 .. autoapiclass:: airflow.sdk.DeltaTriggerTimetable
 
 .. autoapiclass:: airflow.sdk.EventsTimetable
 
+.. autoapiclass:: airflow.sdk.IdentityMapper
+
 .. autoapiclass:: airflow.sdk.MultipleCronTriggerTimetable
 
 .. autoapiclass:: airflow.sdk.PartitionMapper
 
-.. autoapiclass:: airflow.sdk.IdentityMapper
-
 .. autoapiclass:: airflow.sdk.PartitionedAssetTimetable
 
 
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index a530dadafd6..c8067c9ea91 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -37,6 +37,7 @@ __all__ = [
     "Context",
     "CronDataIntervalTimetable",
     "CronTriggerTimetable",
+    "CronPartitionTimetable",
     "DAG",
     "DagRunState",
     "DeadlineAlert",
@@ -121,6 +122,7 @@ if TYPE_CHECKING:
         DeltaDataIntervalTimetable,
     )
     from airflow.sdk.definitions.timetables.trigger import (
+        CronPartitionTimetable,
         CronTriggerTimetable,
         DeltaTriggerTimetable,
         MultipleCronTriggerTimetable,
@@ -151,6 +153,7 @@ __lazy_imports: dict[str, str] = {
     "Context": ".definitions.context",
     "CronDataIntervalTimetable": ".definitions.timetables.interval",
     "CronTriggerTimetable": ".definitions.timetables.trigger",
+    "CronPartitionTimetable": ".definitions.timetables.trigger",
     "DAG": ".definitions.dag",
     "DagRunState": ".api.datamodels._generated",
     "DeadlineAlert": ".definitions.deadline",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi 
b/task-sdk/src/airflow/sdk/__init__.pyi
index 21315fa231d..ed55a56a80a 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -70,6 +70,7 @@ from airflow.sdk.definitions.timetables.interval import (
     DeltaDataIntervalTimetable,
 )
 from airflow.sdk.definitions.timetables.trigger import (
+    CronPartitionTimetable,
     CronTriggerTimetable,
     DeltaTriggerTimetable,
     MultipleCronTriggerTimetable,
@@ -100,6 +101,7 @@ __all__ = [
     "Context",
     "CronDataIntervalTimetable",
     "CronTriggerTimetable",
+    "CronPartitionTimetable",
     "DAG",
     "DagRunState",
     "DeltaDataIntervalTimetable",
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py 
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index afde7e2cfd1..4451ab07461 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import datetime
+from types import NoneType
 from typing import TYPE_CHECKING
 
 import attrs
@@ -88,10 +89,10 @@ class CronTriggerTimetable(CronMixin, BaseTimetable):
 @attrs.define(init=False)
 class MultipleCronTriggerTimetable(BaseTimetable):
     """
-    Timetable that triggers DAG runs according to multiple cron expressions.
+    Timetable that triggers Dag runs according to multiple cron expressions.
 
     This combines multiple ``CronTriggerTimetable`` instances underneath, and
-    triggers a DAG run whenever one of the timetables want to trigger a run.
+    triggers a Dag run whenever one of the timetables want to trigger a run.
 
     Only at most one run is triggered for any given time, even if more than one
     timetable fires at the same time.
@@ -114,3 +115,67 @@ class MultipleCronTriggerTimetable(BaseTimetable):
                 for cron in crons
             ],
         )
+
+
[email protected]
+class CronPartitionTimetable(CronTriggerTimetable):
+    """
+    Timetable that triggers Dag runs according to a cron expression.
+
+    Creates runs for partition keys.
+
+    The cron expression determines the sequence of run dates. And
+    the partition dates are derived from those according to the ``run_offset``.
+    The partition key is then formatted using the partition date.
+
+    A ``run_offset`` of 1 means the partition_date will be one cron interval
+    after the run date; negative means the partition date will be one cron
+    interval prior to the run date.
+
+    :param cron: cron string that defines when to run
+    :param timezone: Which timezone to use to interpret the cron string
+    :param run_offset: Integer offset that determines which partition date to 
run for.
+        The partition key will be derived from the partition date.
+    :param key_format: How to translate the partition date into a string 
partition key.
+
+    *run_immediately* controls, if no *start_time* is given to the Dag, when
+    the first run of the Dag should be scheduled. It has no effect if there 
already exist runs for this Dag.
+
+    * If *True*, always run immediately the most recent possible Dag run.
+    * If *False*, wait to run until the next scheduled time in the future.
+    * If passed a ``timedelta``, will run the most recent possible Dag run
+      if that run's ``data_interval_end`` is within timedelta of now.
+    * If *None*, the timedelta is calculated as 10% of the time between the
+      most recent past scheduled time and the next scheduled time. E.g. if
+      running every hour, this would run the previous time if less than 6
+      minutes had past since the previous run time, otherwise it would wait
+      until the next hour.
+
+    # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
+    # todo: AIP-76 we could allow a tuple of integer + time-based
+
+    """
+
+    run_offset: int | datetime.timedelta | relativedelta | None = None
+    key_format: str = "%Y-%m-%dT%H:%M:%S"  # todo: AIP-76 we can't infer 
partition date from this, so we need to store it separately
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone | FixedTimezone,
+        run_offset: int | datetime.timedelta | relativedelta | None = None,
+        run_immediately: bool | datetime.timedelta = False,
+        key_format: str = "%Y-%m-%dT%H:%M:%S",  # todo: AIP-76 we can't infer 
partition date from this, so we need to store it separately
+    ) -> None:
+        # super().__init__(cron, timezone=timezone, 
run_immediately=run_immediately)
+        if not isinstance(run_offset, (int, NoneType)):
+            # todo: AIP-76 implement timedelta / relative delta?
+            raise ValueError("Run offset other than integer not supported 
yet.")
+        self.__attrs_init__(  # type: ignore[attr-defined]
+            cron,
+            timezone=timezone,
+            run_offset=run_offset,
+            run_immediately=run_immediately,
+            key_format=key_format,
+        )

Reply via email to