This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d0f981c2ffb Deprecate implicit REMOTE_TASK_LOG registration in 
ElasticsearchTaskHandler (#67105)
d0f981c2ffb is described below

commit d0f981c2ffb7cb0177096582ac4625f84fa1e6b9
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Sat May 23 10:28:47 2026 +0800

    Deprecate implicit REMOTE_TASK_LOG registration in ElasticsearchTaskHandler 
(#67105)
    
    ``ElasticsearchTaskHandler.__init__`` papered over a missing
    ``REMOTE_TASK_LOG`` by self-registering as the remote task-log reader during
    ``dictConfig``. That side-effect is now deprecated and will be removed in a
    future provider release.
    
    - Emit ``AirflowProviderDeprecationWarning`` (``stacklevel=1`` so 
module-based
      filters match through ``dictConfig``) when the handler falls back to the
      implicit registration path.
    - Add a logging guide under ``providers/elasticsearch/docs/logging/`` that
      documents the recommended override: set
      ``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` at module scope in 
your
      ``[logging] logging_config_class`` module.
    - Add a provider-local ``conftest.py`` that suppresses the new deprecation
      warning during direct handler construction in unit tests (mirroring the
      recommended user pattern).
---
 providers/elasticsearch/docs/changelog.rst         |  8 ++
 providers/elasticsearch/docs/logging/index.rst     | 93 ++++++++++++++++++++++
 .../providers/elasticsearch/log/es_task_handler.py | 25 ++++++
 .../tests/unit/elasticsearch/log/conftest.py       | 52 ++++++++++++
 4 files changed, 178 insertions(+)

diff --git a/providers/elasticsearch/docs/changelog.rst 
b/providers/elasticsearch/docs/changelog.rst
index 01b919c3b09..6ee0bfccc12 100644
--- a/providers/elasticsearch/docs/changelog.rst
+++ b/providers/elasticsearch/docs/changelog.rst
@@ -27,6 +27,14 @@
 Changelog
 ---------
 
+``ElasticsearchTaskHandler`` no longer silently registers itself as the remote
+task-log reader during ``dictConfig``. The implicit registration still happens
+for one more release but now emits an ``AirflowProviderDeprecationWarning`` and
+will be removed in a future provider release. If you ship a custom
+``[logging] logging_config_class`` module that swaps in
+``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG = 
ElasticsearchRemoteLogIO(...)``
+at module scope in that module.
+
 6.5.4
 .....
 
diff --git a/providers/elasticsearch/docs/logging/index.rst 
b/providers/elasticsearch/docs/logging/index.rst
index 5cfae8d6230..df2a6e6be71 100644
--- a/providers/elasticsearch/docs/logging/index.rst
+++ b/providers/elasticsearch/docs/logging/index.rst
@@ -64,6 +64,99 @@ To output task logs to ElasticSearch, the following config 
could be used: (set `
     write_to_es = True
     target_index = [name of the index to store logs]
 
+.. _elasticsearch-airflow-3-0-to-3-1-local-settings:
+
+Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7
+''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
+
+This section is **only about reading task logs back into the Airflow UI**. 
Tasks running
+on workers will write logs as usual (to local files, stdout, or — with 
appropriate log
+shipping — to Elasticsearch) regardless of the override below. Without the 
override on
+Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot 
render them**
+because no handler is registered to fetch them back.
+
+The wiring that registers ``ElasticsearchTaskHandler`` inside the stock
+``airflow_local_settings.py`` (the file that builds 
``DEFAULT_LOGGING_CONFIG``) landed in
+Airflow **3.2.0** (`apache/airflow#62121
+<https://github.com/apache/airflow/pull/62121>`_) and was backported to 
Airflow **3.1.8**
+(`apache/airflow#62940 <https://github.com/apache/airflow/pull/62940>`_). On 
Airflow
+**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log 
viewer
+fetch logs from Elasticsearch you must ship a custom logging config that swaps 
the
+``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The 
override requires
+``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821
+<https://github.com/apache/airflow/pull/53821>`_), which is where
+``ElasticsearchRemoteLogIO`` was introduced.
+
+Create a module on the Python path — for example 
``config/airflow_local_settings.py`` —
+and point Airflow at it via ``[logging] logging_config_class``:
+
+.. code-block:: python
+
+    from airflow.config_templates.airflow_local_settings import (
+        BASE_LOG_FOLDER,
+        DEFAULT_LOGGING_CONFIG,
+    )
+    from airflow.providers.common.compat.sdk import conf
+    from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchRemoteLogIO
+
+    ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None)
+
+    REMOTE_TASK_LOG = None
+    DEFAULT_REMOTE_CONN_ID = None
+
+    if ELASTICSEARCH_HOST:
+        DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
+            "class": 
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
+            "formatter": "airflow",
+            "base_log_folder": str(BASE_LOG_FOLDER),
+            "end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark", 
fallback="end_of_log"),
+            "host": ELASTICSEARCH_HOST,
+            "frontend": conf.get("elasticsearch", "frontend", fallback=""),
+            "write_stdout": conf.getboolean("elasticsearch", "write_stdout"),
+            "write_to_es": conf.getboolean("elasticsearch", "write_to_es", 
fallback=False),
+            "json_format": conf.getboolean("elasticsearch", "json_format"),
+            "json_fields": conf.get("elasticsearch", "json_fields"),
+            "host_field": conf.get("elasticsearch", "host_field", 
fallback="host"),
+            "offset_field": conf.get("elasticsearch", "offset_field", 
fallback="offset"),
+        }
+        REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
+            host=ELASTICSEARCH_HOST,
+            target_index=conf.get("elasticsearch", "target_index", 
fallback="airflow-logs"),
+            write_stdout=conf.getboolean("elasticsearch", "write_stdout"),
+            write_to_es=conf.getboolean("elasticsearch", "write_to_es", 
fallback=False),
+            offset_field=conf.get("elasticsearch", "offset_field", 
fallback="offset"),
+            host_field=conf.get("elasticsearch", "host_field", 
fallback="host"),
+            base_log_folder=str(BASE_LOG_FOLDER),
+            delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
+            json_format=conf.getboolean("elasticsearch", "json_format"),
+            log_id_template=conf.get(
+                "elasticsearch",
+                "log_id_template",
+                
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
+            ),
+        )
+
+Then, in ``airflow.cfg``:
+
+.. code-block:: ini
+
+    [logging]
+    remote_logging = True
+    logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG
+
+.. note::
+
+   Earlier versions of this guide relied on ``ElasticsearchTaskHandler`` 
self-registering
+   ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig`` 
instantiated it.
+   That implicit registration is now deprecated 
(``AirflowProviderDeprecationWarning``)
+   and will be removed in a future provider release; define 
``REMOTE_TASK_LOG`` at
+   module scope as shown above. See :ref:`write-logs-advanced` for the full
+   ``logging_config_class`` contract.
+
+On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock
+``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:`` 
branch, so
+configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient.
+
 .. _write-logs-elasticsearch-tls:
 
 Writing logs to Elasticsearch over TLS
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 402d64e5bf1..8aad101b44d 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -25,6 +25,7 @@ import os
 import shutil
 import sys
 import time
+import warnings
 from collections import defaultdict
 from collections.abc import Callable, Iterable
 from operator import attrgetter
@@ -41,6 +42,7 @@ from elasticsearch import helpers
 from elasticsearch.exceptions import NotFoundError
 
 import airflow.logging_config as alc
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models.dagrun import DagRun
 from airflow.providers.common.compat.sdk import conf
 from airflow.providers.elasticsearch._compat import apply_compat_with
@@ -318,8 +320,31 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
                 from airflow.logging_config import _ActiveLoggingConfig, 
get_remote_task_log
 
                 if get_remote_task_log() is None:
+                    # stacklevel=1 keeps the warning attributed to this 
airflow.providers
+                    # module so module-based deprecation filters still match; 
dictConfig
+                    # is in stdlib and would otherwise hide the warning at 
stacklevel=2.
+                    warnings.warn(
+                        "Implicit REMOTE_TASK_LOG registration by 
ElasticsearchTaskHandler "
+                        "during dictConfig is deprecated and will be removed 
in a future "
+                        "provider release. Set ``REMOTE_TASK_LOG = 
ElasticsearchRemoteLogIO(...)`` "
+                        "at module scope in your ``[logging] 
logging_config_class`` module. "
+                        "See the Elasticsearch provider logging documentation 
for the "
+                        "updated override example.",
+                        AirflowProviderDeprecationWarning,
+                        stacklevel=1,
+                    )
                     _ActiveLoggingConfig.set(self.io, None)
             elif alc.REMOTE_TASK_LOG is None:  # type: ignore[attr-defined]
+                warnings.warn(
+                    "Implicit REMOTE_TASK_LOG registration by 
ElasticsearchTaskHandler "
+                    "during dictConfig is deprecated and will be removed in a 
future "
+                    "provider release. Set ``REMOTE_TASK_LOG = 
ElasticsearchRemoteLogIO(...)`` "
+                    "at module scope in your ``[logging] 
logging_config_class`` module. "
+                    "See the Elasticsearch provider logging documentation for 
the "
+                    "updated override example.",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=1,
+                )
                 alc.REMOTE_TASK_LOG = self.io  # type: ignore[attr-defined]
 
     @staticmethod
diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py 
b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py
new file mode 100644
index 00000000000..a1746818eed
--- /dev/null
+++ b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
+
+
[email protected](autouse=True)
+def _no_implicit_remote_task_log_warning(monkeypatch):
+    """
+    Suppress the deprecated implicit-registration warning during direct 
handler construction.
+
+    Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at 
module
+    scope in the ``[logging] logging_config_class`` module, so
+    ``ElasticsearchTaskHandler.__init__`` does not emit
+    ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin
+    treats as a test failure).
+
+    On Airflow 2.x the handler does not run the registration branch at all, so 
the
+    fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist).
+    On Airflow 3.0/3.1 the handler reads 
``airflow.logging_config.REMOTE_TASK_LOG``.
+    On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``.
+    """
+    if not AIRFLOW_V_3_0_PLUS:
+        return
+    if AIRFLOW_V_3_2_PLUS:
+        from airflow.logging_config import _ActiveLoggingConfig
+
+        # raising=False is required because remote_task_log is annotation-only 
at
+        # class scope and may not be initialized in isolated test runs.
+        monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", 
True, raising=False)
+        monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), 
raising=False)
+    else:
+        import airflow.logging_config
+
+        monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG", 
object(), raising=False)

Reply via email to