This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 91843291f54 Refactor Opensearch log formatter to use
timezone.from_timestamp (#66856)
91843291f54 is described below
commit 91843291f540aed5c3de70148d53b299b08e48f6
Author: Taehoon Kim <[email protected]>
AuthorDate: Sat May 23 01:05:42 2026 +0900
Refactor Opensearch log formatter to use timezone.from_timestamp (#66856)
* Refactor Opensearch log formatter to use timezone.from_timestamp
* Fix: Restore version_compat.py and use version-based gating for
Opensearch log formatter
* Fix: Move airflow.sdk.timezone import to local scope to fix compatibility
with older Airflow 3.x
* Add TODO for minimum Airflow version in Opensearch JSON Formatter
---
.../providers/opensearch/log/os_json_formatter.py | 20 +++++++++++++-------
.../airflow/providers/opensearch/version_compat.py | 2 ++
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
b/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
index e41530ac660..e8ea47accc0 100644
---
a/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
+++
b/providers/opensearch/src/airflow/providers/opensearch/log/os_json_formatter.py
@@ -16,10 +16,7 @@
# under the License.
from __future__ import annotations
-from datetime import datetime
-
-import pendulum
-
+from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_3_PLUS
from airflow.utils.log.json_formatter import JSONFormatter
@@ -32,9 +29,18 @@ class OpensearchJSONFormatter(JSONFormatter):
def formatTime(self, record, datefmt=None):
"""Return the creation time of the LogRecord in ISO 8601 date/time
format in the local time zone."""
- # TODO: Use airflow.utils.timezone.from_timestamp(record.created,
tz="local")
- # as soon as min Airflow 2.9.0
- dt = datetime.fromtimestamp(record.created,
tz=pendulum.local_timezone())
+ if AIRFLOW_V_3_3_PLUS:
+ from airflow.sdk import timezone
+
+ dt = timezone.from_timestamp(record.created, tz="local")
+ else:
+ # TODO: Remove this fallback when the minimum Airflow version is
3.3.0
+ from datetime import datetime
+
+ import pendulum
+
+ dt = datetime.fromtimestamp(record.created,
tz=pendulum.local_timezone())
+
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
index e840569a4d8..eb02fb81909 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
@@ -35,9 +35,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS: bool = get_base_airflow_version_tuple() >= (3, 3, 0)
__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
"AIRFLOW_V_3_2_PLUS",
+ "AIRFLOW_V_3_3_PLUS",
]