This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 83741051d5 Avoid `pendulum.from_timestamp` usage (#37160)
83741051d5 is described below
commit 83741051d5a774afca1430e6a7d86af5297fe6d0
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Feb 4 01:51:29 2024 +0400
Avoid `pendulum.from_timestamp` usage (#37160)
---
.../elasticsearch/log/es_json_formatter.py | 6 ++-
airflow/serialization/serialized_objects.py | 7 ++--
airflow/utils/log/timezone_aware.py | 4 +-
airflow/utils/timezone.py | 22 +++++++++++
pyproject.toml | 2 +
tests/utils/test_timezone.py | 46 +++++++++++++++++++++-
6 files changed, 79 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_json_formatter.py
b/airflow/providers/elasticsearch/log/es_json_formatter.py
index cf77896a92..49e3f54a50 100644
--- a/airflow/providers/elasticsearch/log/es_json_formatter.py
+++ b/airflow/providers/elasticsearch/log/es_json_formatter.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
+
import pendulum
from airflow.utils.log.json_formatter import JSONFormatter
@@ -30,7 +32,9 @@ class ElasticsearchJSONFormatter(JSONFormatter):
def formatTime(self, record, datefmt=None):
"""Return the creation time of the LogRecord in ISO 8601 date/time
format in the local time zone."""
- dt = pendulum.from_timestamp(record.created,
tz=pendulum.local_timezone())
+ # TODO: Use airflow.utils.timezone.from_timestamp(record.created,
tz="local")
+ # as soon as min Airflow 2.9.0
+ dt = datetime.fromtimestamp(record.created,
tz=pendulum.local_timezone())
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index e77c43f5f5..5ec07e4f6c 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -30,7 +30,6 @@ from typing import TYPE_CHECKING, Any, Collection, Iterable,
Mapping, NamedTuple
import attrs
import lazy_object_proxy
-import pendulum
from dateutil import relativedelta
from pendulum.tz.timezone import FixedTimezone, Timezone
@@ -65,7 +64,7 @@ from airflow.utils.docs import get_docs_url
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
-from airflow.utils.timezone import parse_timezone
+from airflow.utils.timezone import from_timestamp, parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
@@ -567,7 +566,7 @@ class BaseSerialization:
elif type_ == DAT.OP:
return SerializedBaseOperator.deserialize_operator(var)
elif type_ == DAT.DATETIME:
- return pendulum.from_timestamp(var)
+ return from_timestamp(var)
elif type_ == DAT.POD:
if not _has_kubernetes():
raise RuntimeError("Cannot deserialize POD objects without
kubernetes libraries installed!")
@@ -611,7 +610,7 @@ class BaseSerialization:
else:
raise TypeError(f"Invalid type {type_!s} in deserialization.")
- _deserialize_datetime = pendulum.from_timestamp
+ _deserialize_datetime = from_timestamp
_deserialize_timezone = parse_timezone
@classmethod
diff --git a/airflow/utils/log/timezone_aware.py
b/airflow/utils/log/timezone_aware.py
index ae96a11116..4ca35480a1 100644
--- a/airflow/utils/log/timezone_aware.py
+++ b/airflow/utils/log/timezone_aware.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import logging
-import pendulum
+from airflow.utils import timezone
class TimezoneAware(logging.Formatter):
@@ -39,7 +39,7 @@ class TimezoneAware(logging.Formatter):
This returns the creation time of the specified LogRecord in ISO 8601
date and time format in the local time zone.
"""
- dt = pendulum.from_timestamp(record.created,
tz=pendulum.local_timezone())
+ dt = timezone.from_timestamp(record.created, tz="local")
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 152ef35954..966c4bbdc1 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -27,6 +27,8 @@ from pendulum.datetime import DateTime
if TYPE_CHECKING:
from pendulum.tz.timezone import FixedTimezone, Timezone
+ from airflow.typing_compat import Literal
+
_PENDULUM3 = pendulum.__version__.startswith("3")
# UTC Timezone as a tzinfo instance. Actual value depends on pendulum version:
# - Timezone("UTC") in pendulum 3
@@ -299,3 +301,23 @@ def local_timezone() -> FixedTimezone | Timezone:
:meta private:
"""
return pendulum.tz.local_timezone()
+
+
+def from_timestamp(
+ timestamp: int | float, tz: str | FixedTimezone | Timezone |
Literal["local"] = utc
+) -> DateTime:
+ """
+ Parse timestamp and return DateTime in a given time zone.
+
+ :param timestamp: epoch time in seconds.
+ :param tz: In which timezone should return a resulting object.
+ Could be either one of pendulum timezone, IANA timezone or `local`
literal.
+
+ :meta private:
+ """
+ result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
+ if tz != utc or tz != "UTC":
+ if isinstance(tz, str) and tz.lower() == "local":
+ tz = local_timezone()
+ result = result.in_timezone(tz)
+ return result
diff --git a/pyproject.toml b/pyproject.toml
index 0e7e892f55..abc1ea0013 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1374,6 +1374,8 @@ combine-as-imports = true
"airflow.AirflowException".msg = "Use airflow.exceptions.AirflowException
instead."
"airflow.Dataset".msg = "Use airflow.datasets.Dataset instead."
"airflow.models.baseoperator.BaseOperatorLink".msg = "Use
airflow.models.baseoperatorlink.BaseOperatorLink"
+# Uses deprecated in Python 3.12 `datetime.datetime.utcfromtimestamp`
+"pendulum.from_timestamp".msg = "Use airflow.utils.timezone.from_timestamp"
[tool.ruff.flake8-tidy-imports]
# Ban certain modules from being imported at module level, instead requiring
diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py
index df8af04604..c7e6d0c09a 100644
--- a/tests/utils/test_timezone.py
+++ b/tests/utils/test_timezone.py
@@ -21,7 +21,7 @@ import datetime
import pendulum
import pytest
-from pendulum.tz.timezone import Timezone
+from pendulum.tz.timezone import FixedTimezone, Timezone
from airflow.utils import timezone
from airflow.utils.timezone import coerce_datetime, parse_timezone
@@ -156,3 +156,47 @@ def test_parse_timezone_offset(tz_offset: int,
expected_offset, expected_name):
assert tz.offset == expected_offset
assert tz.name == expected_name
assert parse_timezone(tz_offset) is tz
+
+
[email protected](
+ "tz",
+ [
+ pytest.param(None, id="implicit"),
+ pytest.param(timezone.utc, id="explicit"),
+ pytest.param("UTC", id="utc-literal"),
+ ],
+)
+def test_from_timestamp_utc(tz):
+ from_ts = timezone.from_timestamp(0) if tz is None else
timezone.from_timestamp(0, tz=tz)
+ assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+ assert from_ts.tzinfo == timezone.utc
+
+
[email protected]("tz", ["local", "LOCAL"])
+def test_from_timestamp_local(tz):
+ local_tz = timezone.local_timezone()
+ from_ts = timezone.from_timestamp(0, tz=tz)
+ assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+ assert from_ts.tzinfo == local_tz
+
+
[email protected](
+ "tz, iana_timezone",
+ [
+ pytest.param(Timezone("Europe/Paris"), "Europe/Paris",
id="pendulum-timezone"),
+ pytest.param("America/New_York", "America/New_York",
id="IANA-timezone"),
+ ],
+)
+def test_from_timestamp_iana_timezones(tz, iana_timezone):
+ from_ts = timezone.from_timestamp(0, tz=tz)
+ assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+ # In pendulum 2 there is a problem with compare tzinfo object (caching?),
so we check the name
+ assert from_ts.tzinfo.name == iana_timezone
+ assert isinstance(from_ts.tzinfo, Timezone)
+
+
[email protected]("utc_offset", [3600, -7200])
+def test_from_timestamp_fixed_timezone(utc_offset):
+ from_ts = timezone.from_timestamp(0, tz=FixedTimezone(utc_offset))
+ assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+ assert from_ts.utcoffset() == datetime.timedelta(seconds=utc_offset)