This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fb27898a77 Introduce `DatasetOrTimeSchedule` (#36710)
fb27898a77 is described below
commit fb27898a7706e2e38cf49aaea1a215afb4ca57f3
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Feb 1 21:39:18 2024 +0800
Introduce `DatasetOrTimeSchedule` (#36710)
* Introduce DatasetTimetable
This special timetable allows a DAG to be run against a time-based
schedule and dataset events at the same time. The logic is nothing
special---scheduled runs are created based on a time-based timetable,
and dataset-triggered runs are created when dataset events happen. The
two do not interact in any way.
Co-authored-by: Ankit Chaurasia <[email protected]>
Co-authored-by: Daniel Standish
<[email protected]>
---
airflow/example_dags/example_datasets.py | 17 ++
airflow/models/dag.py | 3 +
airflow/serialization/serialized_objects.py | 12 +-
airflow/timetables/datasets.py | 92 +++++++++++
airflow/timetables/simple.py | 4 +-
.../authoring-and-scheduling/datasets.rst | 9 +
.../authoring-and-scheduling/timetable.rst | 39 +++++
tests/models/test_taskinstance.py | 1 +
tests/timetables/test_datasets_timetable.py | 182 +++++++++++++++++++++
9 files changed, 353 insertions(+), 6 deletions(-)
diff --git a/airflow/example_dags/example_datasets.py
b/airflow/example_dags/example_datasets.py
index ba3ed927f1..9dfaaf0c34 100644
--- a/airflow/example_dags/example_datasets.py
+++ b/airflow/example_dags/example_datasets.py
@@ -43,6 +43,8 @@ import pendulum
from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
+from airflow.timetables.datasets import DatasetOrTimeSchedule
+from airflow.timetables.trigger import CronTriggerTimetable
# [START dataset_def]
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
@@ -128,3 +130,18 @@ with DAG(
outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")],
bash_command="sleep 5",
)
+
+with DAG(
+ dag_id="dataset_and_time_based_timetable",
+ catchup=False,
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=DatasetOrTimeSchedule(
+ timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"),
datasets=[dag1_dataset]
+ ),
+ tags=["dataset-time-based-timetable"],
+) as dag7:
+ BashOperator(
+ outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
+ task_id="consuming_dataset_time_based",
+ bash_command="sleep 5",
+ )
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 92e1a8945f..988dfb25e4 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -109,6 +109,7 @@ from airflow.secrets.local_filesystem import
LocalFilesystemBackend
from airflow.security import permissions
from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
+from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.interval import CronDataIntervalTimetable,
DeltaDataIntervalTimetable
from airflow.timetables.simple import (
ContinuousTimetable,
@@ -595,6 +596,8 @@ class DAG(LoggingMixin):
self.timetable = DatasetTriggeredTimetable()
self.schedule_interval = self.timetable.summary
elif timetable:
+ if isinstance(timetable, DatasetOrTimeSchedule):
+ self.dataset_triggers = timetable.datasets
self.timetable = timetable
self.schedule_interval = self.timetable.summary
else:
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 87ee4d4a73..e77c43f5f5 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -197,12 +197,14 @@ class _TimetableNotRegistered(ValueError):
)
-def _encode_timetable(var: Timetable) -> dict[str, Any]:
+def encode_timetable(var: Timetable) -> dict[str, Any]:
"""
Encode a timetable instance.
This delegates most of the serialization work to the type, so the behavior
can be completely controlled by a custom subclass.
+
+ :meta private:
"""
timetable_class = type(var)
importable_string = qualname(timetable_class)
@@ -211,12 +213,14 @@ def _encode_timetable(var: Timetable) -> dict[str, Any]:
return {Encoding.TYPE: importable_string, Encoding.VAR: var.serialize()}
-def _decode_timetable(var: dict[str, Any]) -> Timetable:
+def decode_timetable(var: dict[str, Any]) -> Timetable:
"""
Decode a previously serialized timetable.
Most of the deserialization logic is delegated to the actual type, which
we import from string.
+
+ :meta private:
"""
importable_string = var[Encoding.TYPE]
timetable_class = _get_registered_timetable(importable_string)
@@ -401,7 +405,7 @@ class BaseSerialization:
elif key in decorated_fields:
serialized_object[key] = cls.serialize(value)
elif key == "timetable" and value is not None:
- serialized_object[key] = _encode_timetable(value)
+ serialized_object[key] = encode_timetable(value)
else:
value = cls.serialize(value)
if isinstance(value, dict) and Encoding.TYPE in value:
@@ -1368,7 +1372,7 @@ class SerializedDAG(DAG, BaseSerialization):
# Value structure matches exactly
pass
elif k == "timetable":
- v = _decode_timetable(v)
+ v = decode_timetable(v)
elif k in cls._decorated_fields:
v = cls.deserialize(v)
elif k == "params":
diff --git a/airflow/timetables/datasets.py b/airflow/timetables/datasets.py
new file mode 100644
index 0000000000..4904c64e9c
--- /dev/null
+++ b/airflow/timetables/datasets.py
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import collections.abc
+import typing
+
+import attrs
+
+from airflow.datasets import Dataset
+from airflow.exceptions import AirflowTimetableInvalid
+from airflow.timetables.simple import DatasetTriggeredTimetable as
DatasetTriggeredSchedule
+from airflow.utils.types import DagRunType
+
+if typing.TYPE_CHECKING:
+ import pendulum
+
+ from airflow.timetables.base import DagRunInfo, DataInterval,
TimeRestriction, Timetable
+
+
+class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
+ """Combine time-based scheduling with event-based scheduling."""
+
+ def __init__(self, timetable: Timetable, datasets:
collections.abc.Collection[Dataset]) -> None:
+ self.timetable = timetable
+ self.datasets = datasets
+
+ self.description = f"Triggered by datasets or {timetable.description}"
+ self.periodic = timetable.periodic
+ self._can_be_scheduled = timetable._can_be_scheduled
+
+ self.run_ordering = timetable.run_ordering
+ self.active_runs_limit = timetable.active_runs_limit
+
+ @classmethod
+ def deserialize(cls, data: dict[str, typing.Any]) -> Timetable:
+ from airflow.serialization.serialized_objects import decode_timetable
+
+ return cls(
+ timetable=decode_timetable(data["timetable"]),
datasets=[Dataset(**d) for d in data["datasets"]]
+ )
+
+ def serialize(self) -> dict[str, typing.Any]:
+ from airflow.serialization.serialized_objects import encode_timetable
+
+ return {
+ "timetable": encode_timetable(self.timetable),
+ "datasets": [attrs.asdict(e) for e in self.datasets],
+ }
+
+ def validate(self) -> None:
+ if isinstance(self.timetable, DatasetTriggeredSchedule):
+ raise AirflowTimetableInvalid("cannot nest dataset timetables")
+ if not isinstance(self.datasets, collections.abc.Collection) or not
all(
+ isinstance(d, Dataset) for d in self.datasets
+ ):
+ raise AirflowTimetableInvalid("all elements in 'event' must be
datasets")
+
+ @property
+ def summary(self) -> str:
+ return f"Dataset or {self.timetable.summary}"
+
+ def infer_manual_data_interval(self, *, run_after: pendulum.DateTime) ->
DataInterval:
+ return self.timetable.infer_manual_data_interval(run_after=run_after)
+
+ def next_dagrun_info(
+ self, *, last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction
+ ) -> DagRunInfo | None:
+ return self.timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=restriction,
+ )
+
+ def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any)
-> str:
+ if run_type != DagRunType.DATASET_TRIGGERED:
+ return self.timetable.generate_run_id(run_type=run_type, **kwargs)
+ return super().generate_run_id(run_type=run_type, **kwargs)
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index f8ee8474af..06b26c1375 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -17,7 +17,7 @@
from __future__ import annotations
import operator
-from typing import TYPE_CHECKING, Any, Collection
+from typing import TYPE_CHECKING, Any, Collection, Sequence
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone
@@ -35,7 +35,7 @@ class _TrivialTimetable(Timetable):
"""Some code reuse for "trivial" timetables that has nothing complex."""
periodic = False
- run_ordering = ("execution_date",)
+ run_ordering: Sequence[str] = ("execution_date",)
@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index 2cc34d9c88..2456503649 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -236,3 +236,12 @@ Example:
print_triggering_dataset_events()
Note that this example is using `(.values() | first | first)
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_
to fetch the first of one Dataset given to the DAG, and the first of one
DatasetEvent for that Dataset. An implementation may be quite complex if you
have multiple Datasets, potentially with multiple DatasetEvents.
+
+Combining Dataset and Time-Based Schedules
+------------------------------------------
+
+DatasetTimetable Integration
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+With the introduction of ``DatasetTimetable``, it is now possible to schedule
DAGs based on both dataset events and time-based schedules. This feature offers
flexibility for scenarios where a DAG needs to be triggered by data updates as
well as run periodically according to a fixed timetable.
+
+For more detailed information on ``DatasetTimetable`` and its usage, refer to
the corresponding section in :ref:`DatasetTimetable
<dataset-timetable-section>`.
diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst
b/docs/apache-airflow/authoring-and-scheduling/timetable.rst
index 6366e18d22..c33cd075ab 100644
--- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst
@@ -177,6 +177,45 @@ first) event for the data interval, otherwise manual runs
will run with a ``data
def example_dag():
pass
+.. _dataset-timetable-section:
+
+DatasetTimetable
+^^^^^^^^^^^^^^^^
+
+The ``DatasetTimetable`` is a specialized timetable allowing for the
scheduling of DAGs based on both time-based schedules and dataset events. It
facilitates the creation of scheduled runs (as per traditional timetables) and
dataset-triggered runs, which operate independently.
+
+This feature is particularly useful in scenarios where a DAG needs to run on
dataset updates and also at periodic intervals. It ensures that the workflow
remains responsive to data changes and consistently runs regular checks or
updates.
+
+Here's an example of a DAG using ``DatasetTimetable``:
+
+.. code-block:: python
+
+ from airflow.timetables.dataset import DatasetTimetable
+ from airflow.timetables.trigger import CronTriggerTimetable
+ from airflow.datasets import Dataset
+ from airflow.models import DAG
+ from airflow.operators.bash import BashOperator
+ import pendulum
+
+ with DAG(
+ dag_id="dataset_and_time_based_timetable",
+ catchup=False,
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=DatasetTimetable(time=CronTriggerTimetable("0 1 * * 3",
timezone="UTC"), event=[dag1_dataset]),
+ tags=["dataset-time-based-timetable"],
+ ) as dag7:
+ BashOperator(
+
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
+ task_id="consuming_dataset_time_based",
+ bash_command="sleep 5",
+ )
+
+In this example, the DAG is scheduled to run every Wednesday at 01:00 UTC
based on the ``CronTriggerTimetable``, and it is also triggered by updates to
``dag1_dataset``.
+
+Future Enhancements
+~~~~~~~~~~~~~~~~~~~
+Future iterations may introduce more complex combinations for scheduling
(e.g., dataset1 OR dataset2 OR timetable), further enhancing the flexibility
for scheduling DAGs in various scenarios.
+
Timetables comparisons
----------------------
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index a19cf30f8d..16ac978807 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2096,6 +2096,7 @@ class TestTaskInstance:
assert session.query(DatasetDagRunQueue.target_dag_id).filter_by(
dataset_id=event.dataset.id
).order_by(DatasetDagRunQueue.target_dag_id).all() == [
+ ("dataset_and_time_based_timetable",),
("dataset_consumes_1",),
("dataset_consumes_1_and_2",),
("dataset_consumes_1_never_scheduled",),
diff --git a/tests/timetables/test_datasets_timetable.py
b/tests/timetables/test_datasets_timetable.py
new file mode 100644
index 0000000000..8e293888ca
--- /dev/null
+++ b/tests/timetables/test_datasets_timetable.py
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+import pytest
+from pendulum import DateTime
+
+from airflow.datasets import Dataset
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
+from airflow.timetables.datasets import DatasetOrTimeSchedule
+from airflow.utils.types import DagRunType
+
+
+class MockTimetable(Timetable):
+ """
+ A mock Timetable class for testing purposes in Apache Airflow.
+ """
+
+ __test__ = False
+
+ def __init__(self) -> None:
+ """
+ Initializes the MockTimetable with the current DateTime.
+ """
+ self._now = DateTime.now()
+
+ def next_dagrun_info(
+ self, last_automated_data_interval: DataInterval | None, restriction:
TimeRestriction
+ ) -> DagRunInfo | None:
+ """
+ Calculates the next DagRun information based on the provided interval
and restrictions.
+
+ :param last_automated_data_interval: The last automated data interval.
+ :param restriction: The time restriction to apply.
+ """
+ if last_automated_data_interval is None:
+ next_run_date = self._now
+ else:
+ next_run_date = last_automated_data_interval.end.add(days=1)
+
+ if restriction.earliest and next_run_date < restriction.earliest:
+ next_run_date = restriction.earliest
+
+ if restriction.latest and next_run_date > restriction.latest:
+ return None
+
+ return DagRunInfo.interval(start=next_run_date,
end=next_run_date.add(days=1))
+
+ def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
+ """
+ Infers the data interval for manual triggers.
+
+ :param run_after: The datetime after which the run is triggered.
+ """
+ return DataInterval.exact(run_after)
+
+
+def serialize_timetable(timetable: Timetable) -> str:
+ """
+ Mock serialization function for Timetable objects.
+
+ :param timetable: The Timetable object to serialize.
+ """
+ return "serialized_timetable"
+
+
+def deserialize_timetable(serialized: str) -> MockTimetable:
+ """
+ Mock deserialization function for Timetable objects.
+
+ :param serialized: The serialized data of the timetable.
+ """
+ return MockTimetable()
+
+
[email protected]
+def test_timetable() -> MockTimetable:
+ """Pytest fixture for creating a MockTimetable object."""
+ return MockTimetable()
+
+
[email protected]
+def test_datasets() -> list[Dataset]:
+ """Pytest fixture for creating a list of Dataset objects."""
+ return [Dataset("test_dataset")]
+
+
[email protected]
+def dataset_timetable(test_timetable: MockTimetable, test_datasets:
list[Dataset]) -> DatasetOrTimeSchedule:
+ """
+ Pytest fixture for creating a DatasetTimetable object.
+
+ :param test_timetable: The test timetable instance.
+ :param test_datasets: A list of Dataset instances.
+ """
+ return DatasetOrTimeSchedule(timetable=test_timetable,
datasets=test_datasets)
+
+
+def test_serialization(dataset_timetable: DatasetOrTimeSchedule, monkeypatch:
Any) -> None:
+ """
+ Tests the serialization method of DatasetTimetable.
+
+ :param dataset_timetable: The DatasetTimetable instance to test.
+ :param monkeypatch: The monkeypatch fixture from pytest.
+ """
+ monkeypatch.setattr(
+ "airflow.serialization.serialized_objects.encode_timetable", lambda x:
"mock_serialized_timetable"
+ )
+ serialized = dataset_timetable.serialize()
+ assert serialized == {
+ "timetable": "mock_serialized_timetable",
+ "datasets": [{"uri": "test_dataset", "extra": None}],
+ }
+
+
+def test_deserialization(monkeypatch: Any) -> None:
+ """
+ Tests the deserialization method of DatasetTimetable.
+
+ :param monkeypatch: The monkeypatch fixture from pytest.
+ """
+ monkeypatch.setattr(
+ "airflow.serialization.serialized_objects.decode_timetable", lambda x:
MockTimetable()
+ )
+ mock_serialized_data = {"timetable": "mock_serialized_timetable",
"datasets": [{"uri": "test_dataset"}]}
+ deserialized = DatasetOrTimeSchedule.deserialize(mock_serialized_data)
+ assert isinstance(deserialized, DatasetOrTimeSchedule)
+
+
+def test_infer_manual_data_interval(dataset_timetable: DatasetOrTimeSchedule)
-> None:
+ """
+ Tests the infer_manual_data_interval method of DatasetTimetable.
+
+ :param dataset_timetable: The DatasetTimetable instance to test.
+ """
+ run_after = DateTime.now()
+ result = dataset_timetable.infer_manual_data_interval(run_after=run_after)
+ assert isinstance(result, DataInterval)
+
+
+def test_next_dagrun_info(dataset_timetable: DatasetOrTimeSchedule) -> None:
+ """
+ Tests the next_dagrun_info method of DatasetTimetable.
+
+ :param dataset_timetable: The DatasetTimetable instance to test.
+ """
+ last_interval = DataInterval.exact(DateTime.now())
+ restriction = TimeRestriction(earliest=DateTime.now(), latest=None,
catchup=True)
+ result = dataset_timetable.next_dagrun_info(
+ last_automated_data_interval=last_interval, restriction=restriction
+ )
+ assert result is None or isinstance(result, DagRunInfo)
+
+
+def test_generate_run_id(dataset_timetable: DatasetOrTimeSchedule) -> None:
+ """
+ Tests the generate_run_id method of DatasetTimetable.
+
+ :param dataset_timetable: The DatasetTimetable instance to test.
+ """
+ run_id = dataset_timetable.generate_run_id(
+ run_type=DagRunType.MANUAL, extra_args="test",
logical_date=DateTime.now(), data_interval=None
+ )
+ assert isinstance(run_id, str)