This is an automated email from the ASF dual-hosted git repository. taragolis pushed a commit to branch pendulum-3 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 840d301b3cef362c33ad1fec7ff49f6f0fe540b4 Author: Andrey Anshin <[email protected]> AuthorDate: Mon Dec 18 20:14:59 2023 +0400 Add support of Pendulum 3 --- airflow/serialization/serialized_objects.py | 7 ++-- airflow/serialization/serializers/datetime.py | 15 ++++---- airflow/serialization/serializers/timezone.py | 7 ++-- airflow/settings.py | 11 +++--- airflow/timetables/_cron.py | 10 +++--- airflow/utils/sqlalchemy.py | 5 +-- airflow/utils/timezone.py | 37 ++++++++++++++++---- generated/provider_dependencies.json | 2 +- setup.cfg | 4 +-- tests/models/test_dag.py | 9 ++--- tests/sensors/test_time_sensor.py | 17 +++++----- tests/serialization/test_serialized_objects.py | 2 +- tests/utils/test_timezone.py | 47 +++++++++++++++++++++++--- 13 files changed, 112 insertions(+), 61 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 48aa595933..71a4144edf 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -65,6 +65,7 @@ from airflow.utils.docs import get_docs_url from airflow.utils.module_loading import import_string, qualname from airflow.utils.operator_resources import Resources from airflow.utils.task_group import MappedTaskGroup, TaskGroup +from airflow.utils.timezone import parse_timezone from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: @@ -167,9 +168,9 @@ def encode_timezone(var: Timezone) -> str | int: ) -def decode_timezone(var: str | int) -> Timezone: +def decode_timezone(var: str | int): """Decode a previously serialized Pendulum Timezone.""" - return pendulum.tz.timezone(var) + return parse_timezone(var) def _get_registered_timetable(importable_string: str) -> type[Timetable] | None: @@ -607,7 +608,7 @@ class BaseSerialization: raise TypeError(f"Invalid type {type_!s} in deserialization.") _deserialize_datetime = pendulum.from_timestamp - _deserialize_timezone = pendulum.tz.timezone + _deserialize_timezone = parse_timezone @classmethod def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta: diff --git a/airflow/serialization/serializers/datetime.py b/airflow/serialization/serializers/datetime.py index 49f0899a59..ea030a8afc 100644 --- a/airflow/serialization/serializers/datetime.py +++ b/airflow/serialization/serializers/datetime.py @@ -24,7 +24,7 @@ from airflow.serialization.serializers.timezone import ( serialize as serialize_timezone, ) from airflow.utils.module_loading import qualname -from airflow.utils.timezone import convert_to_utc, is_naive +from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone if TYPE_CHECKING: import datetime @@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date import datetime from pendulum import DateTime - from pendulum.tz import fixed_timezone, timezone tz: datetime.tzinfo | None = None if isinstance(data, dict) and TIMEZONE in data: if version == 1: # try to deserialize unsupported timezones timezone_mapping = { - "EDT": fixed_timezone(-4 * 3600), - "CDT": fixed_timezone(-5 * 3600), - "MDT": fixed_timezone(-6 * 3600), - "PDT": fixed_timezone(-7 * 3600), - "CEST": timezone("CET"), + "EDT": parse_timezone(-4 * 3600), + "CDT": parse_timezone(-5 * 3600), + "MDT": parse_timezone(-6 * 3600), + "PDT": parse_timezone(-7 * 3600), + "CEST": parse_timezone("CET"), } if data[TIMEZONE] in timezone_mapping: tz = timezone_mapping[data[TIMEZONE]] else: - tz = timezone(data[TIMEZONE]) + tz = parse_timezone(data[TIMEZONE]) else: tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0]) diff --git a/airflow/serialization/serializers/timezone.py b/airflow/serialization/serializers/timezone.py index 23901b9d44..0f580adef8 100644 --- a/airflow/serialization/serializers/timezone.py +++ b/airflow/serialization/serializers/timezone.py @@ -74,7 +74,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]: def deserialize(classname: str, version: int, data: object) -> Any: - from pendulum.tz import fixed_timezone, timezone + from airflow.utils.timezone import parse_timezone if not isinstance(data, (str, int)): raise TypeError(f"{data} is not of type int or str but of {type(data)}") @@ -82,9 +82,6 @@ def deserialize(classname: str, version: int, data: object) -> Any: if version > __version__: raise TypeError(f"serialized {version} of {classname} > {__version__}") - if isinstance(data, int): - return fixed_timezone(data) - if "zoneinfo.ZoneInfo" in classname: try: from zoneinfo import ZoneInfo @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any: return ZoneInfo(data) - return timezone(data) + return parse_timezone(data) # ported from pendulum.tz.timezone._get_tzinfo_name diff --git a/airflow/settings.py b/airflow/settings.py index 1a38a59ed3..53c5cc6aa4 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -26,7 +26,6 @@ import sys import warnings from typing import TYPE_CHECKING, Any, Callable -import pendulum import pluggy import sqlalchemy from sqlalchemy import create_engine, exc, text @@ -40,6 +39,7 @@ from airflow.executors import executor_constants from airflow.logging_config import configure_logging from airflow.utils.orm_event_handlers import setup_event_handlers from airflow.utils.state import State +from airflow.utils.timezone import local_timezone, parse_timezone, utc if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -50,13 +50,12 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) try: - tz = conf.get_mandatory_value("core", "default_timezone") - if tz == "system": - TIMEZONE = pendulum.tz.local_timezone() + if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system": + TIMEZONE = parse_timezone(tz) else: - TIMEZONE = pendulum.tz.timezone(tz) + TIMEZONE = local_timezone() except Exception: - TIMEZONE = pendulum.tz.timezone("UTC") + TIMEZONE = utc log.info("Configured default timezone %s", TIMEZONE) diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py index 45bfe3640f..15e4f820ea 100644 --- a/airflow/timetables/_cron.py +++ b/airflow/timetables/_cron.py @@ -19,17 +19,15 @@ from __future__ import annotations import datetime from typing import TYPE_CHECKING, Any -import pendulum from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException from croniter import CroniterBadCronError, CroniterBadDateError, croniter from airflow.exceptions import AirflowTimetableInvalid from airflow.utils.dates import cron_presets -from airflow.utils.timezone import convert_to_utc, make_aware, make_naive +from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone if TYPE_CHECKING: - from pendulum import DateTime - from pendulum.tz.timezone import Timezone + from pendulum import DateTime, FixedTimezone, Timezone def _covers_every_hour(cron: croniter) -> bool: @@ -63,11 +61,11 @@ def _covers_every_hour(cron: croniter) -> bool: class CronMixin: """Mixin to provide interface to work with croniter.""" - def __init__(self, cron: str, timezone: str | Timezone) -> None: + def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None: self._expression = cron_presets.get(cron, cron) if isinstance(timezone, str): - timezone = pendulum.tz.timezone(timezone) + timezone = parse_timezone(timezone) self._timezone = timezone try: diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index a042d4e902..fb241f482f 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -24,7 +24,6 @@ import json import logging from typing import TYPE_CHECKING, Any, Generator, Iterable, overload -import pendulum from dateutil import relativedelta from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_ from sqlalchemy.dialects import mssql, mysql @@ -34,7 +33,7 @@ from sqlalchemy.types import JSON, Text, TypeDecorator, UnicodeText from airflow import settings from airflow.configuration import conf from airflow.serialization.enums import Encoding -from airflow.utils.timezone import make_naive +from airflow.utils.timezone import make_naive, utc if TYPE_CHECKING: from kubernetes.client.models.v1_pod import V1Pod @@ -46,8 +45,6 @@ if TYPE_CHECKING: log = logging.getLogger(__name__) -utc = pendulum.tz.timezone("UTC") - class UtcDateTime(TypeDecorator): """ diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 12c75bef59..fb32c093f8 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -23,9 +23,10 @@ from typing import overload import pendulum from dateutil.relativedelta import relativedelta from pendulum.datetime import DateTime +from pendulum.tz.timezone import FixedTimezone, Timezone # UTC time zone as a tzinfo instance. -utc = pendulum.tz.timezone("UTC") +utc = Timezone("UTC") def is_localized(value): @@ -135,12 +136,10 @@ def make_aware(value: dt.datetime | None, timezone: dt.tzinfo | None = None) -> # Check that we won't overwrite the timezone of an aware datetime. if is_localized(value): raise ValueError(f"make_aware expects a naive datetime, got {value}") - if hasattr(value, "fold"): - # In case of python 3.6 we want to do the same that pendulum does for python3.5 - # i.e in case we move clock back we want to schedule the run at the time of the second - # instance of the same clock time rather than the first one. - # Fold parameter has no impact in other cases so we can safely set it to 1 here - value = value.replace(fold=1) + # In case we move clock back we want to schedule the run at the time of the second + # instance of the same clock time rather than the first one. + # Fold parameter has no impact in other cases, so we can safely set it to 1 here + value = value.replace(fold=1) localized = getattr(timezone, "localize", None) if localized is not None: # This method is available for pytz time zones @@ -273,3 +272,27 @@ def td_format(td_object: None | dt.timedelta | float | int) -> str | None: if not joined: return "<1s" return joined + + +def parse_timezone(name: str | int) -> FixedTimezone | Timezone: + """ + Parse timezone and return one of the pendulum Timezone. + + Provide the same interface as ``pendulum.timezone(name)`` + + :param name: Either IANA timezone or offset to UTC in seconds. + + :meta private: + """ + return pendulum.timezone(name) + + +def local_timezone() -> FixedTimezone | Timezone: + """ + Return local timezone. + + Provide the same interface as ``pendulum.tz.local_timezone()`` + + :meta private: + """ + return pendulum.tz.local_timezone() diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 780009c536..b755f6c839 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -418,7 +418,7 @@ "asgiref>=3.5.2", "gcloud-aio-auth>=4.0.0,<5.0.0", "gcloud-aio-bigquery>=6.1.2", - "gcloud-aio-storage", + "gcloud-aio-storage>=9.0.0", "gcsfs>=2023.10.0", "google-ads>=22.1.0", "google-api-core>=2.11.0", diff --git a/setup.cfg b/setup.cfg index 9077a02915..3c8208af01 100644 --- a/setup.cfg +++ b/setup.cfg @@ -129,9 +129,7 @@ install_requires = opentelemetry-exporter-otlp packaging>=14.0 pathspec>=0.9.0 - # When (if) pendulum 3 released it would introduce changes in module/objects imports, - # since we are tightly coupled with pendulum library internally it will breaks Airflow functionality. - pendulum>=2.0,<3.0 + pendulum>=3.0 pluggy>=1.0 psutil>=4.2.0 pydantic>=2.3.0 diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f7bf1ad6d0..de7fa87d5a 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -37,6 +37,7 @@ import pendulum import pytest import time_machine from dateutil.relativedelta import relativedelta +from pendulum.tz.timezone import Timezone from sqlalchemy import inspect from airflow import settings @@ -676,8 +677,8 @@ class TestDag: """ Make sure DST transitions are properly observed """ - local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55), dst_rule=pendulum.PRE_TRANSITION) + local_tz = Timezone("Europe/Zurich") + start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55, fold=0)) assert start.isoformat() == "2018-10-28T02:55:00+02:00", "Pre-condition: start date is in DST" utc = timezone.convert_to_utc(start) @@ -706,7 +707,7 @@ class TestDag: Make sure DST transitions are properly observed """ local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 10, 27, 3), dst_rule=pendulum.PRE_TRANSITION) + start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0)) utc = timezone.convert_to_utc(start) @@ -735,7 +736,7 @@ class TestDag: Make sure DST transitions are properly observed """ local_tz = pendulum.timezone("Europe/Zurich") - start = local_tz.convert(datetime.datetime(2018, 3, 25, 2), dst_rule=pendulum.PRE_TRANSITION) + start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0)) utc = timezone.convert_to_utc(start) diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py index 935d1cb128..bc30e73144 100644 --- a/tests/sensors/test_time_sensor.py +++ b/tests/sensors/test_time_sensor.py @@ -18,12 +18,11 @@ from __future__ import annotations from datetime import datetime, time -from unittest.mock import patch import pendulum import pytest import time_machine -from pendulum.tz.timezone import UTC +from pendulum.tz.timezone import Timezone from airflow.exceptions import TaskDeferred from airflow.models.dag import DAG @@ -33,7 +32,7 @@ from airflow.utils import timezone DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00 DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1) -DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)) +DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=Timezone(DEFAULT_TIMEZONE)) class TestTimeSensor: @@ -46,11 +45,11 @@ class TestTimeSensor: ], ) @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc)) - def test_timezone(self, default_timezone, start_date, expected): - with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)): - dag = DAG("test", default_args={"start_date": start_date}) - op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) - assert op.poke(None) == expected + def test_timezone(self, default_timezone, start_date, expected, monkeypatch): + monkeypatch.setattr("airflow.settings.TIMEZONE", Timezone(default_timezone)) + dag = DAG("test", default_args={"start_date": start_date}) + op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) + assert op.poke(None) == expected class TestTimeSensorAsync: @@ -85,4 +84,4 @@ class TestTimeSensorAsync: ): op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0)) assert op.target_datetime.time() == pendulum.time(1, 0) - assert op.target_datetime.tzinfo == UTC + assert op.target_datetime.tzinfo == timezone.utc diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index c059a8d236..96b1963579 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -142,7 +142,7 @@ def equal_time(a: datetime, b: datetime) -> bool: (1, None, equals), (datetime.utcnow(), DAT.DATETIME, equal_time), (timedelta(minutes=2), DAT.TIMEDELTA, equals), - (pendulum.tz.timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name), + (pendulum.tz.Timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name), (relativedelta.relativedelta(hours=+1), DAT.RELATIVEDELTA, lambda a, b: a.hours == b.hours), ({"test": "dict", "test-1": 1}, None, equals), (["array_item", 2], None, equals), diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index ff5ad26f5a..df8af04604 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -21,13 +21,14 @@ import datetime import pendulum import pytest +from pendulum.tz.timezone import Timezone from airflow.utils import timezone -from airflow.utils.timezone import coerce_datetime +from airflow.utils.timezone import coerce_datetime, parse_timezone -CET = pendulum.tz.timezone("Europe/Paris") -EAT = pendulum.tz.timezone("Africa/Nairobi") # Africa/Nairobi -ICT = pendulum.tz.timezone("Asia/Bangkok") # Asia/Bangkok +CET = Timezone("Europe/Paris") +EAT = Timezone("Africa/Nairobi") # Africa/Nairobi +ICT = Timezone("Asia/Bangkok") # Asia/Bangkok UTC = timezone.utc @@ -117,3 +118,41 @@ class TestTimezone: ) def test_coerce_datetime(input_datetime, output_datetime): assert output_datetime == coerce_datetime(input_datetime) + + [email protected]( + "tz_name", + [ + pytest.param("Europe/Paris", id="CET"), + pytest.param("Africa/Nairobi", id="EAT"), + pytest.param("Asia/Bangkok", id="ICT"), + ], +) +def test_parse_timezone_iana(tz_name: str): + tz = parse_timezone(tz_name) + assert tz.name == tz_name + assert parse_timezone(tz_name) is tz + + [email protected]("tz_name", ["utc", "UTC", "uTc"]) +def test_parse_timezone_utc(tz_name): + tz = parse_timezone(tz_name) + assert tz.name == "UTC" + assert parse_timezone(tz_name) is tz + assert tz is timezone.utc, "Expected that UTC timezone is same object as `airflow.utils.timezone.utc`" + + [email protected]( + "tz_offset, expected_offset, expected_name", + [ + pytest.param(0, 0, "+00:00", id="zero-offset"), + pytest.param(-3600, -3600, "-01:00", id="1-hour-behind"), + pytest.param(19800, 19800, "+05:30", id="5.5-hours-ahead"), + ], +) +def test_parse_timezone_offset(tz_offset: int, expected_offset, expected_name): + tz = parse_timezone(tz_offset) + assert hasattr(tz, "offset") + assert tz.offset == expected_offset + assert tz.name == expected_name + assert parse_timezone(tz_offset) is tz
