This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f1faf61b0d Make elasticsearch compatible with remote_task_log (#62121)
4f1faf61b0d is described below

commit 4f1faf61b0d55de65631b3c4ff6e24af4cf8519e
Author: Owen Leung <[email protected]>
AuthorDate: Thu Mar 5 22:55:29 2026 +0800

    Make elasticsearch compatible with remote_task_log (#62121)
    
    * Make elasticsearch compatible with remote_task_log and add min version
    
    * Fix pyproject.toml to use elasticsearch>=6.5.0
    
    ---------
    
    Co-authored-by: vatsrahul1001 <[email protected]>
---
 airflow-core/newsfragments/62121.bugfix.rst        |  1 +
 .../config_templates/airflow_local_settings.py     | 38 +++++++++-------------
 pyproject.toml                                     |  4 +--
 scripts/ci/prek/update_airflow_pyproject_toml.py   |  1 +
 4 files changed, 20 insertions(+), 24 deletions(-)

diff --git a/airflow-core/newsfragments/62121.bugfix.rst 
b/airflow-core/newsfragments/62121.bugfix.rst
new file mode 100644
index 00000000000..a35d8ae3bb7
--- /dev/null
+++ b/airflow-core/newsfragments/62121.bugfix.rst
@@ -0,0 +1 @@
+Elasticsearch is now fully compatible with remote logging along side with 
apache-airflow-providers-elasticsearch>=6.5.0. Please review elasticsearch 
provider release notes for more information 
https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/6.5.0/changelog.html
diff --git 
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py 
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index e0f94c77b2d..06639e0e855 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -282,35 +282,29 @@ if REMOTE_LOGGING:
         )
         remote_task_handler_kwargs = {}
     elif ELASTICSEARCH_HOST:
-        ELASTICSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
-        ELASTICSEARCH_FRONTEND: str = 
conf.get_mandatory_value("elasticsearch", "frontend")
+        from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchRemoteLogIO
+
         ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", 
"WRITE_STDOUT")
         ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", 
"WRITE_TO_ES")
         ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", 
"JSON_FORMAT")
-        ELASTICSEARCH_JSON_FIELDS: str = 
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
         ELASTICSEARCH_TARGET_INDEX: str = 
conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
         ELASTICSEARCH_HOST_FIELD: str = 
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
         ELASTICSEARCH_OFFSET_FIELD: str = 
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
+        ELASTICSEARCH_LOG_ID_TEMPLATE: str = 
conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE")
+
+        REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
+            host=ELASTICSEARCH_HOST,
+            target_index=ELASTICSEARCH_TARGET_INDEX,
+            write_stdout=ELASTICSEARCH_WRITE_STDOUT,
+            write_to_es=ELASTICSEARCH_WRITE_TO_ES,
+            offset_field=ELASTICSEARCH_OFFSET_FIELD,
+            host_field=ELASTICSEARCH_HOST_FIELD,
+            base_log_folder=BASE_LOG_FOLDER,
+            delete_local_copy=delete_local_copy,
+            json_format=ELASTICSEARCH_JSON_FORMAT,
+            log_id_template=ELASTICSEARCH_LOG_ID_TEMPLATE,
+        )
 
-        ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
-            "task": {
-                "class": 
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
-                "formatter": "airflow",
-                "base_log_folder": BASE_LOG_FOLDER,
-                "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
-                "host": ELASTICSEARCH_HOST,
-                "frontend": ELASTICSEARCH_FRONTEND,
-                "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
-                "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
-                "target_index": ELASTICSEARCH_TARGET_INDEX,
-                "json_format": ELASTICSEARCH_JSON_FORMAT,
-                "json_fields": ELASTICSEARCH_JSON_FIELDS,
-                "host_field": ELASTICSEARCH_HOST_FIELD,
-                "offset_field": ELASTICSEARCH_OFFSET_FIELD,
-            },
-        }
-
-        DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
     elif OPENSEARCH_HOST:
         OPENSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
         OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
diff --git a/pyproject.toml b/pyproject.toml
index 91bf374862c..e4757bca658 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -208,7 +208,7 @@ packages = []
     "apache-airflow-providers-edge3>=1.0.0"
 ]
 "elasticsearch" = [
-    "apache-airflow-providers-elasticsearch>=5.5.2"
+    "apache-airflow-providers-elasticsearch>=6.5.0" # Set from 
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
 ]
 "exasol" = [
     "apache-airflow-providers-exasol>=4.6.1"
@@ -428,7 +428,7 @@ packages = []
     "apache-airflow-providers-discord>=3.9.0",
     "apache-airflow-providers-docker>=3.14.1",
     "apache-airflow-providers-edge3>=1.0.0",
-    "apache-airflow-providers-elasticsearch>=5.5.2",
+    "apache-airflow-providers-elasticsearch>=6.5.0", # Set from 
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
     "apache-airflow-providers-exasol>=4.6.1",
     "apache-airflow-providers-fab>=2.2.0; python_version !=\"3.13\"", # Set 
from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
     "apache-airflow-providers-facebook>=3.7.0",
diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py 
b/scripts/ci/prek/update_airflow_pyproject_toml.py
index 3c7a9acae57..924b183f2e9 100755
--- a/scripts/ci/prek/update_airflow_pyproject_toml.py
+++ b/scripts/ci/prek/update_airflow_pyproject_toml.py
@@ -79,6 +79,7 @@ MIN_VERSION_OVERRIDE: dict[str, Version] = {
     "openlineage": parse_version("2.3.0"),
     "git": parse_version("0.0.2"),
     "common.messaging": parse_version("2.0.0"),
+    "elasticsearch": parse_version("6.5.0"),
 }
 
 

Reply via email to