This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 91843291f54 Refactor Opensearch log formatter to use 
timezone.from_timestamp (#66856)
91843291f54 is described below

commit 91843291f540aed5c3de70148d53b299b08e48f6
Author: Taehoon Kim <[email protected]>
AuthorDate: Sat May 23 01:05:42 2026 +0900

    Refactor Opensearch log formatter to use timezone.from_timestamp (#66856)
    
    * Refactor Opensearch log formatter to use timezone.from_timestamp
    
    * Fix: Restore version_compat.py and use version-based gating for 
Opensearch log formatter
    
    * Fix: Move airflow.sdk.timezone import to local scope to fix compatibility 
with older Airflow 3.x
    
    * Add TODO for minimum Airflow version in Opensearch JSON Formatter
---
 .../providers/opensearch/log/os_json_formatter.py    | 20 +++++++++++++-------
 .../airflow/providers/opensearch/version_compat.py   |  2 ++
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
index e41530ac660..e8ea47accc0 100644
--- 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
+++ 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
@@ -16,10 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime
-
-import pendulum
-
+from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_3_PLUS
 from airflow.utils.log.json_formatter import JSONFormatter
 
 
@@ -32,9 +29,18 @@ class OpensearchJSONFormatter(JSONFormatter):
 
     def formatTime(self, record, datefmt=None):
         """Return the creation time of the LogRecord in ISO 8601 date/time 
format in the local time zone."""
-        # TODO: Use airflow.utils.timezone.from_timestamp(record.created, 
tz="local")
-        #  as soon as min Airflow 2.9.0
-        dt = datetime.fromtimestamp(record.created, 
tz=pendulum.local_timezone())
+        if AIRFLOW_V_3_3_PLUS:
+            from airflow.sdk import timezone
+
+            dt = timezone.from_timestamp(record.created, tz="local")
+        else:
+            # TODO: Remove this fallback when the minimum Airflow version is 
3.3.0
+            from datetime import datetime
+
+            import pendulum
+
+            dt = datetime.fromtimestamp(record.created, 
tz=pendulum.local_timezone())
+
         s = dt.strftime(datefmt or self.default_time_format)
         if self.default_msec_format:
             s = self.default_msec_format % (s, record.msecs)
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py 
b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
index e840569a4d8..eb02fb81909 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
@@ -35,9 +35,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
 AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
 AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS: bool = get_base_airflow_version_tuple() >= (3, 3, 0)
 
 __all__ = [
     "AIRFLOW_V_3_0_PLUS",
     "AIRFLOW_V_3_1_PLUS",
     "AIRFLOW_V_3_2_PLUS",
+    "AIRFLOW_V_3_3_PLUS",
 ]

Reply via email to