This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a89514ec38 chore(core): stop deferring TimeDeltaSensorAsync task when
the target_dttm is in the past (#40719)
a89514ec38 is described below
commit a89514ec38d368efa9733c8376953024c8da9f1a
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Jul 12 15:29:31 2024 +0200
chore(core): stop deferring TimeDeltaSensorAsync task when the target_dttm
is in the past (#40719)
* chore(core): stop deferring TimeDeltaSensorAsync task when the
target_dttm is in the past
* add a unit test
---
airflow/sensors/time_delta.py | 5 ++++-
tests/sensors/test_time_delta.py | 28 +++++++++++++++++++++++++++-
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py
index 82d16bbae6..226d520aa0 100644
--- a/airflow/sensors/time_delta.py
+++ b/airflow/sensors/time_delta.py
@@ -66,9 +66,12 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
"""
- def execute(self, context: Context) -> NoReturn:
+ def execute(self, context: Context) -> bool | NoReturn:
target_dttm = context["data_interval_end"]
target_dttm += self.delta
+ if timezone.utcnow() > target_dttm:
+ # If the target datetime is in the past, return immediately
+ return True
try:
trigger = DateTimeTrigger(moment=target_dttm)
except (TypeError, ValueError) as e:
diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py
index b19af4a8a8..159cefab9c 100644
--- a/tests/sensors/test_time_delta.py
+++ b/tests/sensors/test_time_delta.py
@@ -18,12 +18,14 @@
from __future__ import annotations
from datetime import timedelta
+from unittest import mock
+import pendulum
import pytest
from airflow.models import DagBag
from airflow.models.dag import DAG
-from airflow.sensors.time_delta import TimeDeltaSensor
+from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.utils.timezone import datetime
pytestmark = pytest.mark.db_test
@@ -32,6 +34,7 @@ pytestmark = pytest.mark.db_test
DEFAULT_DATE = datetime(2015, 1, 1)
DEV_NULL = "/dev/null"
TEST_DAG_ID = "unit_tests"
+REFERENCE_TIME = pendulum.now("UTC").replace(microsecond=0, second=0, minute=0)
class TestTimedeltaSensor:
@@ -43,3 +46,26 @@ class TestTimedeltaSensor:
def test_timedelta_sensor(self):
op = TimeDeltaSensor(task_id="timedelta_sensor_check",
delta=timedelta(seconds=2), dag=self.dag)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+
+
+class TestTimeDeltaSensorAsync:
+ def setup_method(self):
+ self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
+ self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
+ self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+
+ @pytest.mark.parametrize(
+ "data_interval_end, delta, should_deffer",
+ [
+ (REFERENCE_TIME.add(hours=-1), timedelta(hours=1), False),
+ (REFERENCE_TIME, timedelta(hours=1), True),
+ ],
+ )
+ @mock.patch("airflow.models.baseoperator.BaseOperator.defer")
+ def test_timedelta_sensor(self, defer_mock, data_interval_end, delta,
should_deffer):
+ op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check",
delta=delta, dag=self.dag)
+ op.execute({"data_interval_end": data_interval_end})
+ if should_deffer:
+ defer_mock.assert_called_once()
+ else:
+ defer_mock.assert_not_called()