This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b64ed723421 Refactor Elasticsearch log formatter to use 
timezone.from_timestamp (#67245)
b64ed723421 is described below

commit b64ed723421e7c124a38c576c869d87209facfd3
Author: Taehoon Kim <[email protected]>
AuthorDate: Sat May 23 01:04:57 2026 +0900

    Refactor Elasticsearch log formatter to use timezone.from_timestamp (#67245)
    
    * Add AIRFLOW_V_3_3_PLUS compatibility constant in Elasticsearch provider
    
    * Refactor Elasticsearch log formatter to use timezone.from_timestamp
---
 .../providers/elasticsearch/log/es_json_formatter.py | 20 +++++++++++++-------
 .../providers/elasticsearch/version_compat.py        |  8 +++++++-
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
index 49e3f54a509..9cff71b18b3 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
@@ -16,10 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime
-
-import pendulum
-
+from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_3_PLUS
 from airflow.utils.log.json_formatter import JSONFormatter
 
 
@@ -32,9 +29,18 @@ class ElasticsearchJSONFormatter(JSONFormatter):
 
     def formatTime(self, record, datefmt=None):
         """Return the creation time of the LogRecord in ISO 8601 date/time 
format in the local time zone."""
-        # TODO: Use airflow.utils.timezone.from_timestamp(record.created, 
tz="local")
-        #  as soon as min Airflow 2.9.0
-        dt = datetime.fromtimestamp(record.created, 
tz=pendulum.local_timezone())
+        if AIRFLOW_V_3_3_PLUS:
+            from airflow.sdk import timezone
+
+            dt = timezone.from_timestamp(record.created, tz="local")
+        else:
+            # TODO: Remove this fallback when the minimum Airflow version is 
3.3.0
+            from datetime import datetime
+
+            import pendulum
+
+            dt = datetime.fromtimestamp(record.created, 
tz=pendulum.local_timezone())
+
         s = dt.strftime(datefmt or self.default_time_format)
         if self.default_msec_format:
             s = self.default_msec_format % (s, record.msecs)
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
index caa8c1a5197..806dbac18d2 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
@@ -35,5 +35,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
 AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
 AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS: bool = get_base_airflow_version_tuple() >= (3, 3, 0)
 
-__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_1_PLUS", "AIRFLOW_V_3_2_PLUS"]
+__all__ = [
+    "AIRFLOW_V_3_0_PLUS",
+    "AIRFLOW_V_3_1_PLUS",
+    "AIRFLOW_V_3_2_PLUS",
+    "AIRFLOW_V_3_3_PLUS",
+]

Reply via email to