This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 83741051d5 Avoid `pendulum.from_timestamp` usage (#37160)
83741051d5 is described below

commit 83741051d5a774afca1430e6a7d86af5297fe6d0
Author: Andrey Anshin <[email protected]>
AuthorDate: Sun Feb 4 01:51:29 2024 +0400

    Avoid `pendulum.from_timestamp` usage (#37160)
---
 .../elasticsearch/log/es_json_formatter.py         |  6 ++-
 airflow/serialization/serialized_objects.py        |  7 ++--
 airflow/utils/log/timezone_aware.py                |  4 +-
 airflow/utils/timezone.py                          | 22 +++++++++++
 pyproject.toml                                     |  2 +
 tests/utils/test_timezone.py                       | 46 +++++++++++++++++++++-
 6 files changed, 79 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_json_formatter.py 
b/airflow/providers/elasticsearch/log/es_json_formatter.py
index cf77896a92..49e3f54a50 100644
--- a/airflow/providers/elasticsearch/log/es_json_formatter.py
+++ b/airflow/providers/elasticsearch/log/es_json_formatter.py
@@ -16,6 +16,8 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
+
 import pendulum
 
 from airflow.utils.log.json_formatter import JSONFormatter
@@ -30,7 +32,9 @@ class ElasticsearchJSONFormatter(JSONFormatter):
 
     def formatTime(self, record, datefmt=None):
         """Return the creation time of the LogRecord in ISO 8601 date/time 
format in the local time zone."""
-        dt = pendulum.from_timestamp(record.created, 
tz=pendulum.local_timezone())
+        # TODO: Use airflow.utils.timezone.from_timestamp(record.created, 
tz="local")
+        #  as soon as min Airflow 2.9.0
+        dt = datetime.fromtimestamp(record.created, 
tz=pendulum.local_timezone())
         s = dt.strftime(datefmt or self.default_time_format)
         if self.default_msec_format:
             s = self.default_msec_format % (s, record.msecs)
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index e77c43f5f5..5ec07e4f6c 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -30,7 +30,6 @@ from typing import TYPE_CHECKING, Any, Collection, Iterable, 
Mapping, NamedTuple
 
 import attrs
 import lazy_object_proxy
-import pendulum
 from dateutil import relativedelta
 from pendulum.tz.timezone import FixedTimezone, Timezone
 
@@ -65,7 +64,7 @@ from airflow.utils.docs import get_docs_url
 from airflow.utils.module_loading import import_string, qualname
 from airflow.utils.operator_resources import Resources
 from airflow.utils.task_group import MappedTaskGroup, TaskGroup
-from airflow.utils.timezone import parse_timezone
+from airflow.utils.timezone import from_timestamp, parse_timezone
 from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
@@ -567,7 +566,7 @@ class BaseSerialization:
         elif type_ == DAT.OP:
             return SerializedBaseOperator.deserialize_operator(var)
         elif type_ == DAT.DATETIME:
-            return pendulum.from_timestamp(var)
+            return from_timestamp(var)
         elif type_ == DAT.POD:
             if not _has_kubernetes():
                 raise RuntimeError("Cannot deserialize POD objects without 
kubernetes libraries installed!")
@@ -611,7 +610,7 @@ class BaseSerialization:
         else:
             raise TypeError(f"Invalid type {type_!s} in deserialization.")
 
-    _deserialize_datetime = pendulum.from_timestamp
+    _deserialize_datetime = from_timestamp
     _deserialize_timezone = parse_timezone
 
     @classmethod
diff --git a/airflow/utils/log/timezone_aware.py 
b/airflow/utils/log/timezone_aware.py
index ae96a11116..4ca35480a1 100644
--- a/airflow/utils/log/timezone_aware.py
+++ b/airflow/utils/log/timezone_aware.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 import logging
 
-import pendulum
+from airflow.utils import timezone
 
 
 class TimezoneAware(logging.Formatter):
@@ -39,7 +39,7 @@ class TimezoneAware(logging.Formatter):
         This returns the creation time of the specified LogRecord in ISO 8601
         date and time format in the local time zone.
         """
-        dt = pendulum.from_timestamp(record.created, 
tz=pendulum.local_timezone())
+        dt = timezone.from_timestamp(record.created, tz="local")
         s = dt.strftime(datefmt or self.default_time_format)
         if self.default_msec_format:
             s = self.default_msec_format % (s, record.msecs)
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 152ef35954..966c4bbdc1 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -27,6 +27,8 @@ from pendulum.datetime import DateTime
 if TYPE_CHECKING:
     from pendulum.tz.timezone import FixedTimezone, Timezone
 
+    from airflow.typing_compat import Literal
+
 _PENDULUM3 = pendulum.__version__.startswith("3")
 # UTC Timezone as a tzinfo instance. Actual value depends on pendulum version:
 # - Timezone("UTC") in pendulum 3
@@ -299,3 +301,23 @@ def local_timezone() -> FixedTimezone | Timezone:
     :meta private:
     """
     return pendulum.tz.local_timezone()
+
+
+def from_timestamp(
+    timestamp: int | float, tz: str | FixedTimezone | Timezone | 
Literal["local"] = utc
+) -> DateTime:
+    """
+    Parse timestamp and return DateTime in a given time zone.
+
+    :param timestamp: epoch time in seconds.
+    :param tz: In which timezone should return a resulting object.
+        Could be either one of pendulum timezone, IANA timezone or `local` 
literal.
+
+    :meta private:
+    """
+    result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
+    if tz != utc or tz != "UTC":
+        if isinstance(tz, str) and tz.lower() == "local":
+            tz = local_timezone()
+        result = result.in_timezone(tz)
+    return result
diff --git a/pyproject.toml b/pyproject.toml
index 0e7e892f55..abc1ea0013 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1374,6 +1374,8 @@ combine-as-imports = true
 "airflow.AirflowException".msg = "Use airflow.exceptions.AirflowException 
instead."
 "airflow.Dataset".msg = "Use airflow.datasets.Dataset instead."
 "airflow.models.baseoperator.BaseOperatorLink".msg = "Use 
airflow.models.baseoperatorlink.BaseOperatorLink"
+# Uses deprecated in Python 3.12 `datetime.datetime.utcfromtimestamp`
+"pendulum.from_timestamp".msg = "Use airflow.utils.timezone.from_timestamp"
 
 [tool.ruff.flake8-tidy-imports]
 # Ban certain modules from being imported at module level, instead requiring
diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py
index df8af04604..c7e6d0c09a 100644
--- a/tests/utils/test_timezone.py
+++ b/tests/utils/test_timezone.py
@@ -21,7 +21,7 @@ import datetime
 
 import pendulum
 import pytest
-from pendulum.tz.timezone import Timezone
+from pendulum.tz.timezone import FixedTimezone, Timezone
 
 from airflow.utils import timezone
 from airflow.utils.timezone import coerce_datetime, parse_timezone
@@ -156,3 +156,47 @@ def test_parse_timezone_offset(tz_offset: int, 
expected_offset, expected_name):
     assert tz.offset == expected_offset
     assert tz.name == expected_name
     assert parse_timezone(tz_offset) is tz
+
+
[email protected](
+    "tz",
+    [
+        pytest.param(None, id="implicit"),
+        pytest.param(timezone.utc, id="explicit"),
+        pytest.param("UTC", id="utc-literal"),
+    ],
+)
+def test_from_timestamp_utc(tz):
+    from_ts = timezone.from_timestamp(0) if tz is None else 
timezone.from_timestamp(0, tz=tz)
+    assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+    assert from_ts.tzinfo == timezone.utc
+
+
[email protected]("tz", ["local", "LOCAL"])
+def test_from_timestamp_local(tz):
+    local_tz = timezone.local_timezone()
+    from_ts = timezone.from_timestamp(0, tz=tz)
+    assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+    assert from_ts.tzinfo == local_tz
+
+
[email protected](
+    "tz, iana_timezone",
+    [
+        pytest.param(Timezone("Europe/Paris"), "Europe/Paris", 
id="pendulum-timezone"),
+        pytest.param("America/New_York", "America/New_York", 
id="IANA-timezone"),
+    ],
+)
+def test_from_timestamp_iana_timezones(tz, iana_timezone):
+    from_ts = timezone.from_timestamp(0, tz=tz)
+    assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+    # In pendulum 2 there is a problem with compare tzinfo object (caching?), 
so we check the name
+    assert from_ts.tzinfo.name == iana_timezone
+    assert isinstance(from_ts.tzinfo, Timezone)
+
+
[email protected]("utc_offset", [3600, -7200])
+def test_from_timestamp_fixed_timezone(utc_offset):
+    from_ts = timezone.from_timestamp(0, tz=FixedTimezone(utc_offset))
+    assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
+    assert from_ts.utcoffset() == datetime.timedelta(seconds=utc_offset)

Reply via email to