This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 090a7f0707 DateTimeTrigger typing and tests (#37694)
090a7f0707 is described below
commit 090a7f0707077320225e2cb4b1bc091c8fefee34
Author: Dylan Rajguru <[email protected]>
AuthorDate: Tue Feb 27 12:37:25 2024 -0800
DateTimeTrigger typing and tests (#37694)
---
airflow/triggers/temporal.py | 14 ++++---
scripts/cov/other_coverage.py | 84 +++++++++++++++++++++++++++++------------
tests/triggers/test_temporal.py | 44 +++++++++++++++++++++
3 files changed, 112 insertions(+), 30 deletions(-)
diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py
index 18bdd80bff..03f0901db7 100644
--- a/airflow/triggers/temporal.py
+++ b/airflow/triggers/temporal.py
@@ -18,7 +18,9 @@ from __future__ import annotations
import asyncio
import datetime
-from typing import Any
+from typing import Any, AsyncIterator
+
+import pendulum
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
@@ -42,12 +44,12 @@ class DateTimeTrigger(BaseTrigger):
elif moment.tzinfo is None:
raise ValueError("You cannot pass naive datetimes")
else:
- self.moment = timezone.convert_to_utc(moment)
+ self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)
def serialize(self) -> tuple[str, dict[str, Any]]:
return ("airflow.triggers.temporal.DateTimeTrigger", {"moment":
self.moment})
- async def run(self):
+ async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Loop until the relevant time is met.
@@ -59,13 +61,13 @@ class DateTimeTrigger(BaseTrigger):
# Sleep in successively smaller increments starting from 1 hour down
to 10 seconds at a time
self.log.info("trigger starting")
for step in 3600, 60, 10:
- seconds_remaining = (self.moment -
timezone.utcnow()).total_seconds()
+ seconds_remaining = (self.moment -
pendulum.instance(timezone.utcnow())).total_seconds()
while seconds_remaining > 2 * step:
self.log.info(f"{int(seconds_remaining)} seconds remaining;
sleeping {step} seconds")
await asyncio.sleep(step)
- seconds_remaining = (self.moment -
timezone.utcnow()).total_seconds()
+ seconds_remaining = (self.moment -
pendulum.instance(timezone.utcnow())).total_seconds()
# Sleep a second at a time otherwise
- while self.moment > timezone.utcnow():
+ while self.moment > pendulum.instance(timezone.utcnow()):
self.log.info("sleeping 1 second...")
await asyncio.sleep(1)
# Send our single event and then we're done
diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py
index ff30725ab0..6543d2fc78 100644
--- a/scripts/cov/other_coverage.py
+++ b/scripts/cov/other_coverage.py
@@ -25,45 +25,81 @@ sys.path.insert(0, str(Path(__file__).parent.resolve()))
source_files = [
"airflow/dag_processing",
+ "airflow/triggers",
]
+"""
+Other potential source file packages to scan for coverage.
+You can also compare the stats against those on
+https://app.codecov.io/github/apache/airflow
+(as it combines the coverage from all tests and so may be a bit higher).
+
+ "airflow/auth",
+ "airflow/callbacks",
+ "airflow/config_templates",
+ "airflow/dag_processing",
+ "airflow/datasets",
+ "airflow/decorators",
+ "airflow/hooks",
+ "airflow/io",
+ "airflow/lineage",
+ "airflow/listeners",
+ "airflow/macros",
+ "airflow/notifications",
+ "airflow/secrets",
+ "airflow/security",
+ "airflow/sensors",
+ "airflow/task",
+ "airflow/template",
+ "airflow/timetables",
+ "airflow/triggers",
+"""
files_not_fully_covered = [
"airflow/dag_processing/manager.py",
"airflow/dag_processing/processor.py",
+ "airflow/triggers/base.py",
+ "airflow/triggers/external_task.py",
+ "airflow/triggers/file.py",
+ "airflow/triggers/testing.py",
]
other_tests = [
"tests/dag_processing",
+ "tests/jobs",
+ "tests/triggers",
]
"""
-These 'other' packages can be added to the above lists
-as necessary:
+Other tests to potentially run against the source_file packages:
-"tests/auth",
-"tests/callbacks",
-"tests/charts",
-"tests/cluster_policies",
-"tests/config_templates",
-"tests/datasets",
-"tests/decorators",
-"tests/hooks",
-"tests/io",
-"tests/lineage",
-"tests/listeners",
-"tests/macros",
-"tests/notifications",
-"tests/plugins",
-"tests/secrets",
-"tests/security",
-"tests/sensors",
-"tests/task",
-"tests/template",
-"tests/testconfig",
-"tests/timetables",
-"tests/triggers",
+ "tests/api_internal",
+ "tests/auth",
+ "tests/callbacks",
+ "tests/charts",
+ "tests/cluster_policies",
+ "tests/config_templates",
+ "tests/dag_processing",
+ "tests/datasets",
+ "tests/decorators",
+ "tests/hooks",
+ "tests/io",
+ "tests/jobs",
+ "tests/lineage",
+ "tests/listeners",
+ "tests/macros",
+ "tests/notifications",
+ "tests/plugins",
+ "tests/secrets",
+ "tests/security",
+ "tests/sensors",
+ "tests/task",
+ "tests/template",
+ "tests/testconfig",
+ "tests/timetables",
+ "tests/triggers",
"""
+
if __name__ == "__main__":
args = ["-qq"] + other_tests
run_tests(args, source_files, files_not_fully_covered)
diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py
index 52cc2c64f6..6e8d32c467 100644
--- a/tests/triggers/test_temporal.py
+++ b/tests/triggers/test_temporal.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
import datetime
+from unittest import mock
import pendulum
import pytest
@@ -25,6 +26,7 @@ import pytest
from airflow.triggers.base import TriggerEvent
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
+from airflow.utils.timezone import utcnow
def test_input_validation():
@@ -35,6 +37,16 @@ def test_input_validation():
DateTimeTrigger("2012-01-01T03:03:03+00:00")
+def test_input_validation_tz():
+ """
+ Tests that the DateTimeTrigger validates input to moment arg, it shouldn't
accept naive datetime.
+ """
+
+ moment = datetime.datetime(2013, 3, 31, 0, 59, 59)
+ with pytest.raises(ValueError, match="You cannot pass naive datetimes"):
+ DateTimeTrigger(moment)
+
+
def test_datetime_trigger_serialization():
"""
Tests that the DateTimeTrigger correctly serializes its arguments
@@ -96,3 +108,35 @@ async def test_datetime_trigger_timing(tz):
result = trigger_task.result()
assert isinstance(result, TriggerEvent)
assert result.payload == past_moment
+
+
[email protected]("airflow.triggers.temporal.timezone.utcnow")
[email protected]("airflow.triggers.temporal.asyncio.sleep")
[email protected]
+async def test_datetime_trigger_mocked(mock_sleep, mock_utcnow):
+ """
+ Tests DateTimeTrigger with time and asyncio mocks
+ """
+ start_moment = utcnow()
+ trigger_moment = start_moment + datetime.timedelta(seconds=30)
+
+ # returns the mock 'current time'. The first 3 calls report the initial
time
+ mock_utcnow.side_effect = [
+ start_moment,
+ start_moment,
+ start_moment,
+ start_moment + datetime.timedelta(seconds=20),
+ start_moment + datetime.timedelta(seconds=25),
+ start_moment + datetime.timedelta(seconds=30),
+ ]
+
+ trigger = DateTimeTrigger(trigger_moment)
+ gen = trigger.run()
+ trigger_task = asyncio.create_task(gen.__anext__())
+ await trigger_task
+ mock_sleep.assert_awaited()
+ assert mock_sleep.await_count == 2
+ assert trigger_task.done() is True
+ result = trigger_task.result()
+ assert isinstance(result, TriggerEvent)
+ assert result.payload == trigger_moment