This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b64ed723421 Refactor Elasticsearch log formatter to use
timezone.from_timestamp (#67245)
b64ed723421 is described below
commit b64ed723421e7c124a38c576c869d87209facfd3
Author: Taehoon Kim <[email protected]>
AuthorDate: Sat May 23 01:04:57 2026 +0900
Refactor Elasticsearch log formatter to use timezone.from_timestamp (#67245)
* Add AIRFLOW_V_3_3_PLUS compatibility constant in Elasticsearch provider
* Refactor Elasticsearch log formatter to use timezone.from_timestamp
---
.../providers/elasticsearch/log/es_json_formatter.py | 20 +++++++++++++-------
.../providers/elasticsearch/version_compat.py | 8 +++++++-
2 files changed, 20 insertions(+), 8 deletions(-)
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
index 49e3f54a509..9cff71b18b3 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_json_formatter.py
@@ -16,10 +16,7 @@
# under the License.
from __future__ import annotations
-from datetime import datetime
-
-import pendulum
-
+from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_3_PLUS
from airflow.utils.log.json_formatter import JSONFormatter
@@ -32,9 +29,18 @@ class ElasticsearchJSONFormatter(JSONFormatter):
def formatTime(self, record, datefmt=None):
"""Return the creation time of the LogRecord in ISO 8601 date/time
format in the local time zone."""
- # TODO: Use airflow.utils.timezone.from_timestamp(record.created,
tz="local")
- # as soon as min Airflow 2.9.0
- dt = datetime.fromtimestamp(record.created,
tz=pendulum.local_timezone())
+ if AIRFLOW_V_3_3_PLUS:
+ from airflow.sdk import timezone
+
+ dt = timezone.from_timestamp(record.created, tz="local")
+ else:
+ # TODO: Remove this fallback when the minimum Airflow version is
3.3.0
+ from datetime import datetime
+
+ import pendulum
+
+ dt = datetime.fromtimestamp(record.created,
tz=pendulum.local_timezone())
+
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
index caa8c1a5197..806dbac18d2 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
@@ -35,5 +35,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS: bool = get_base_airflow_version_tuple() >= (3, 3, 0)
-__all__ = ["AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_1_PLUS", "AIRFLOW_V_3_2_PLUS"]
+__all__ = [
+ "AIRFLOW_V_3_0_PLUS",
+ "AIRFLOW_V_3_1_PLUS",
+ "AIRFLOW_V_3_2_PLUS",
+ "AIRFLOW_V_3_3_PLUS",
+]