This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d0f981c2ffb Deprecate implicit REMOTE_TASK_LOG registration in
ElasticsearchTaskHandler (#67105)
d0f981c2ffb is described below
commit d0f981c2ffb7cb0177096582ac4625f84fa1e6b9
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Sat May 23 10:28:47 2026 +0800
Deprecate implicit REMOTE_TASK_LOG registration in ElasticsearchTaskHandler
(#67105)
``ElasticsearchTaskHandler.__init__`` papered over a missing
``REMOTE_TASK_LOG`` by self-registering as the remote task-log reader during
``dictConfig``. That side-effect is now deprecated and will be removed in a
future provider release.
- Emit ``AirflowProviderDeprecationWarning`` (``stacklevel=1`` so
module-based
filters match through ``dictConfig``) when the handler falls back to the
implicit registration path.
- Add a logging guide under ``providers/elasticsearch/docs/logging/`` that
documents the recommended override: set
``REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(...)`` at module scope in
your
``[logging] logging_config_class`` module.
- Add a provider-local ``conftest.py`` that suppresses the new deprecation
warning during direct handler construction in unit tests (mirroring the
recommended user pattern).
---
providers/elasticsearch/docs/changelog.rst | 8 ++
providers/elasticsearch/docs/logging/index.rst | 93 ++++++++++++++++++++++
.../providers/elasticsearch/log/es_task_handler.py | 25 ++++++
.../tests/unit/elasticsearch/log/conftest.py | 52 ++++++++++++
4 files changed, 178 insertions(+)
diff --git a/providers/elasticsearch/docs/changelog.rst
b/providers/elasticsearch/docs/changelog.rst
index 01b919c3b09..6ee0bfccc12 100644
--- a/providers/elasticsearch/docs/changelog.rst
+++ b/providers/elasticsearch/docs/changelog.rst
@@ -27,6 +27,14 @@
Changelog
---------
+``ElasticsearchTaskHandler`` no longer silently registers itself as the remote
+task-log reader during ``dictConfig``. The implicit registration still happens
+for one more release but now emits an ``AirflowProviderDeprecationWarning`` and
+will be removed in a future provider release. If you ship a custom
+``[logging] logging_config_class`` module that swaps in
+``ElasticsearchTaskHandler``, set ``REMOTE_TASK_LOG =
ElasticsearchRemoteLogIO(...)``
+at module scope in that module.
+
6.5.4
.....
diff --git a/providers/elasticsearch/docs/logging/index.rst
b/providers/elasticsearch/docs/logging/index.rst
index 5cfae8d6230..df2a6e6be71 100644
--- a/providers/elasticsearch/docs/logging/index.rst
+++ b/providers/elasticsearch/docs/logging/index.rst
@@ -64,6 +64,99 @@ To output task logs to ElasticSearch, the following config
could be used: (set `
write_to_es = True
target_index = [name of the index to store logs]
+.. _elasticsearch-airflow-3-0-to-3-1-local-settings:
+
+Enabling the Elasticsearch task handler on Airflow 3.0.0 – 3.1.7
+''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
+
+This section is **only about reading task logs back into the Airflow UI**.
Tasks running
+on workers will write logs as usual (to local files, stdout, or — with
appropriate log
+shipping — to Elasticsearch) regardless of the override below. Without the
override on
+Airflow 3.0.0 – 3.1.7, logs reach Elasticsearch fine but the **UI cannot
render them**
+because no handler is registered to fetch them back.
+
+The wiring that registers ``ElasticsearchTaskHandler`` inside the stock
+``airflow_local_settings.py`` (the file that builds
``DEFAULT_LOGGING_CONFIG``) landed in
+Airflow **3.2.0** (`apache/airflow#62121
+<https://github.com/apache/airflow/pull/62121>`_) and was backported to
Airflow **3.1.8**
+(`apache/airflow#62940 <https://github.com/apache/airflow/pull/62940>`_). On
Airflow
+**3.0.0 – 3.1.7** installing the provider is not enough: to make the UI's log
viewer
+fetch logs from Elasticsearch you must ship a custom logging config that swaps
the
+``task`` handler **and** sets ``REMOTE_TASK_LOG`` at module scope. The
override requires
+``apache-airflow-providers-elasticsearch`` **6.5.0+** (`apache/airflow#53821
+<https://github.com/apache/airflow/pull/53821>`_), which is where
+``ElasticsearchRemoteLogIO`` was introduced.
+
+Create a module on the Python path — for example
``config/airflow_local_settings.py`` —
+and point Airflow at it via ``[logging] logging_config_class``:
+
+.. code-block:: python
+
+ from airflow.config_templates.airflow_local_settings import (
+ BASE_LOG_FOLDER,
+ DEFAULT_LOGGING_CONFIG,
+ )
+ from airflow.providers.common.compat.sdk import conf
+ from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchRemoteLogIO
+
+ ELASTICSEARCH_HOST = conf.get("elasticsearch", "host", fallback=None)
+
+ REMOTE_TASK_LOG = None
+ DEFAULT_REMOTE_CONN_ID = None
+
+ if ELASTICSEARCH_HOST:
+ DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
+ "class":
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
+ "formatter": "airflow",
+ "base_log_folder": str(BASE_LOG_FOLDER),
+ "end_of_log_mark": conf.get("elasticsearch", "end_of_log_mark",
fallback="end_of_log"),
+ "host": ELASTICSEARCH_HOST,
+ "frontend": conf.get("elasticsearch", "frontend", fallback=""),
+ "write_stdout": conf.getboolean("elasticsearch", "write_stdout"),
+ "write_to_es": conf.getboolean("elasticsearch", "write_to_es",
fallback=False),
+ "json_format": conf.getboolean("elasticsearch", "json_format"),
+ "json_fields": conf.get("elasticsearch", "json_fields"),
+ "host_field": conf.get("elasticsearch", "host_field",
fallback="host"),
+ "offset_field": conf.get("elasticsearch", "offset_field",
fallback="offset"),
+ }
+ REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
+ host=ELASTICSEARCH_HOST,
+ target_index=conf.get("elasticsearch", "target_index",
fallback="airflow-logs"),
+ write_stdout=conf.getboolean("elasticsearch", "write_stdout"),
+ write_to_es=conf.getboolean("elasticsearch", "write_to_es",
fallback=False),
+ offset_field=conf.get("elasticsearch", "offset_field",
fallback="offset"),
+ host_field=conf.get("elasticsearch", "host_field",
fallback="host"),
+ base_log_folder=str(BASE_LOG_FOLDER),
+ delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
+ json_format=conf.getboolean("elasticsearch", "json_format"),
+ log_id_template=conf.get(
+ "elasticsearch",
+ "log_id_template",
+
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
+ ),
+ )
+
+Then, in ``airflow.cfg``:
+
+.. code-block:: ini
+
+ [logging]
+ remote_logging = True
+ logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG
+
+.. note::
+
+ Earlier versions of this guide relied on ``ElasticsearchTaskHandler``
self-registering
+ ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig``
instantiated it.
+ That implicit registration is now deprecated
(``AirflowProviderDeprecationWarning``)
+ and will be removed in a future provider release; define
``REMOTE_TASK_LOG`` at
+ module scope as shown above. See :ref:`write-logs-advanced` for the full
+ ``logging_config_class`` contract.
+
+On Airflow **3.1.8+** or **3.2.0+** this override is unnecessary — the stock
+``airflow_local_settings.py`` already contains an ``elif ELASTICSEARCH_HOST:``
branch, so
+configuring the ``[elasticsearch]`` section in ``airflow.cfg`` is sufficient.
+
.. _write-logs-elasticsearch-tls:
Writing logs to Elasticsearch over TLS
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 402d64e5bf1..8aad101b44d 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -25,6 +25,7 @@ import os
import shutil
import sys
import time
+import warnings
from collections import defaultdict
from collections.abc import Callable, Iterable
from operator import attrgetter
@@ -41,6 +42,7 @@ from elasticsearch import helpers
from elasticsearch.exceptions import NotFoundError
import airflow.logging_config as alc
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models.dagrun import DagRun
from airflow.providers.common.compat.sdk import conf
from airflow.providers.elasticsearch._compat import apply_compat_with
@@ -318,8 +320,31 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
from airflow.logging_config import _ActiveLoggingConfig,
get_remote_task_log
if get_remote_task_log() is None:
+ # stacklevel=1 keeps the warning attributed to this
airflow.providers
+ # module so module-based deprecation filters still match;
dictConfig
+ # is in stdlib and would otherwise hide the warning at
stacklevel=2.
+ warnings.warn(
+ "Implicit REMOTE_TASK_LOG registration by
ElasticsearchTaskHandler "
+ "during dictConfig is deprecated and will be removed
in a future "
+ "provider release. Set ``REMOTE_TASK_LOG =
ElasticsearchRemoteLogIO(...)`` "
+ "at module scope in your ``[logging]
logging_config_class`` module. "
+ "See the Elasticsearch provider logging documentation
for the "
+ "updated override example.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
_ActiveLoggingConfig.set(self.io, None)
elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined]
+ warnings.warn(
+ "Implicit REMOTE_TASK_LOG registration by
ElasticsearchTaskHandler "
+ "during dictConfig is deprecated and will be removed in a
future "
+ "provider release. Set ``REMOTE_TASK_LOG =
ElasticsearchRemoteLogIO(...)`` "
+ "at module scope in your ``[logging]
logging_config_class`` module. "
+ "See the Elasticsearch provider logging documentation for
the "
+ "updated override example.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined]
@staticmethod
diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py
new file mode 100644
index 00000000000..a1746818eed
--- /dev/null
+++ b/providers/elasticsearch/tests/unit/elasticsearch/log/conftest.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
+
+
[email protected](autouse=True)
+def _no_implicit_remote_task_log_warning(monkeypatch):
+ """
+ Suppress the deprecated implicit-registration warning during direct
handler construction.
+
+ Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at
module
+ scope in the ``[logging] logging_config_class`` module, so
+ ``ElasticsearchTaskHandler.__init__`` does not emit
+ ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin
+ treats as a test failure).
+
+ On Airflow 2.x the handler does not run the registration branch at all, so
the
+ fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist).
+ On Airflow 3.0/3.1 the handler reads
``airflow.logging_config.REMOTE_TASK_LOG``.
+ On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``.
+ """
+ if not AIRFLOW_V_3_0_PLUS:
+ return
+ if AIRFLOW_V_3_2_PLUS:
+ from airflow.logging_config import _ActiveLoggingConfig
+
+ # raising=False is required because remote_task_log is annotation-only
at
+ # class scope and may not be initialized in isolated test runs.
+ monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded",
True, raising=False)
+ monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(),
raising=False)
+ else:
+ import airflow.logging_config
+
+ monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG",
object(), raising=False)