This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 243b704  Add DateTimeSensor (#9697)
243b704 is described below

commit 243b704f47df00365e3421b55d8818fc9c71196f
Author: zikun <[email protected]>
AuthorDate: Fri Jul 24 00:53:10 2020 +0800

    Add DateTimeSensor (#9697)
    
    * Add DateTimeSensor
---
 airflow/sensors/date_time_sensor.py    | 77 ++++++++++++++++++++++++++++++++++
 docs/operators-and-hooks-ref.rst       |  3 ++
 tests/sensors/test_date_time_sensor.py | 72 +++++++++++++++++++++++++++++++
 3 files changed, 152 insertions(+)

diff --git a/airflow/sensors/date_time_sensor.py 
b/airflow/sensors/date_time_sensor.py
new file mode 100644
index 0000000..4af3bd2
--- /dev/null
+++ b/airflow/sensors/date_time_sensor.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+from typing import Dict, Union
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class DateTimeSensor(BaseSensorOperator):
+    """
+    Waits until the specified datetime.
+
+    A major advantage of this sensor is idempotence for the ``target_time``.
+    It handles some cases for which ``TimeSensor`` and ``TimeDeltaSensor`` are 
not suited.
+
+    **Example** 1 :
+        If a task needs to wait for 11am on each ``execution_date``. Using
+        ``TimeSensor`` or ``TimeDeltaSensor``, all backfill tasks started at
+        1am have to wait for 10 hours. This is unnecessary, e.g. a backfill
+        task with ``{{ ds }} = '1970-01-01'`` does not need to wait because
+        ``1970-01-01T11:00:00`` has already passed.
+
+    **Example** 2 :
+        If a DAG is scheduled to run at 23:00 daily, but one of the tasks is
+        required to run at 01:00 next day, using ``TimeSensor`` will return
+        ``True`` immediately because 23:00 > 01:00. Instead, we can do this:
+
+        .. code-block:: python
+
+            DateTimeSensor(
+                task_id='wait_for_0100',
+                target_time='{{ next_execution_date.tomorrow().replace(hour=1) 
}}',
+            )
+
+    :param target_time: datetime after which the job succeeds. (templated)
+    :type target_time: str or datetime.datetime
+    """
+
+    template_fields = ("target_time",)
+
+    @apply_defaults
+    def __init__(
+        self, target_time: Union[str, datetime.datetime], *args, **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        if isinstance(target_time, datetime.datetime):
+            self.target_time = target_time.isoformat()
+        elif isinstance(target_time, str):
+            self.target_time = target_time
+        else:
+            raise TypeError(
+                "Expected str or datetime.datetime type for target_time. Got 
{}".format(
+                    type(target_time)
+                )
+            )
+
+    def poke(self, context: Dict) -> bool:
+        self.log.info("Checking if the time (%s) has come", self.target_time)
+        return timezone.utcnow() > timezone.parse(self.target_time)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 6f5028f..17ed2be 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -98,6 +98,9 @@ Fundamentals
    * - :mod:`airflow.sensors.time_sensor`
      -
 
+   * - :mod:`airflow.sensors.date_time_sensor`
+     -
+
 
 .. _Apache:
 
diff --git a/tests/sensors/test_date_time_sensor.py 
b/tests/sensors/test_date_time_sensor.py
new file mode 100644
index 0000000..b8b81d1
--- /dev/null
+++ b/tests/sensors/test_date_time_sensor.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+from mock import patch
+from parameterized import parameterized
+
+from airflow.models.dag import DAG
+from airflow.sensors.date_time_sensor import DateTimeSensor
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+
+
+class TestDateTimeSensor:
+    @classmethod
+    def setup_class(cls):
+        args = {"owner": "airflow", "start_date": DEFAULT_DATE}
+        cls.dag = DAG("test_dag", default_args=args)
+
+    @parameterized.expand(
+        [
+            (
+                "valid_datetime",
+                timezone.datetime(2020, 7, 6, 13, tzinfo=timezone.utc),
+                "2020-07-06T13:00:00+00:00",
+            ),
+            ("valid_str", "20200706T210000+8", "20200706T210000+8"),
+        ]
+    )
+    def test_valid_input(self, task_id, target_time, expected):
+        op = DateTimeSensor(task_id=task_id, target_time=target_time, 
dag=self.dag,)
+        assert op.target_time == expected
+
+    def test_invalid_input(self):
+        with pytest.raises(TypeError):
+            DateTimeSensor(
+                task_id="test", target_time=timezone.utcnow().time(), 
dag=self.dag,
+            )
+
+    @parameterized.expand(
+        [
+            (
+                "poke_datetime",
+                timezone.datetime(2020, 1, 1, 22, 59, tzinfo=timezone.utc),
+                True,
+            ),
+            ("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False),
+            ("poke_str_basic_with_tz", "20200102T065959+8", True),
+        ]
+    )
+    @patch(
+        "airflow.sensors.date_time_sensor.timezone.utcnow",
+        return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc),
+    )
+    def test_poke(self, task_id, target_time, expected, mock_utcnow):
+        op = DateTimeSensor(task_id=task_id, target_time=target_time, 
dag=self.dag)
+        assert op.poke(None) == expected

Reply via email to