This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8388bb8f89e Deprecate implicit REMOTE_TASK_LOG registration in
OpensearchTaskHandler (#67106)
8388bb8f89e is described below
commit 8388bb8f89ee7cc4b7f4121e3cd0d8b1fb07d29e
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Fri May 22 16:44:48 2026 +0800
Deprecate implicit REMOTE_TASK_LOG registration in OpensearchTaskHandler
(#67106)
``OpensearchTaskHandler.__init__`` papered over a missing
``REMOTE_TASK_LOG``
by self-registering as the remote task-log reader during ``dictConfig``.
That
side-effect is now deprecated and will be removed in a future provider
release.
- Emit ``AirflowProviderDeprecationWarning`` (``stacklevel=1`` so
module-based
filters match through ``dictConfig``) when the handler falls back to the
implicit registration path.
- Update the OpenSearch logging guide to set
``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at module scope in the
``[logging] logging_config_class`` module.
- Add a provider-local ``conftest.py`` that suppresses the new deprecation
warning during direct handler construction in unit tests (mirroring the
recommended user pattern).
---
providers/opensearch/docs/changelog.rst | 8 ++++
providers/opensearch/docs/logging/index.rst | 36 +++++++++++++--
.../providers/opensearch/log/os_task_handler.py | 25 +++++++++++
.../tests/unit/opensearch/log/conftest.py | 52 ++++++++++++++++++++++
4 files changed, 118 insertions(+), 3 deletions(-)
diff --git a/providers/opensearch/docs/changelog.rst
b/providers/opensearch/docs/changelog.rst
index 7b19ace5548..c9cd7a28bc5 100644
--- a/providers/opensearch/docs/changelog.rst
+++ b/providers/opensearch/docs/changelog.rst
@@ -27,6 +27,14 @@
Changelog
---------
+``OpensearchTaskHandler`` no longer silently registers itself as the remote
+task-log reader during ``dictConfig``. The implicit registration still happens
+for one more release but now emits an ``AirflowProviderDeprecationWarning`` and
+will be removed in a future provider release. If you ship a custom
+``[logging] logging_config_class`` module that swaps in
``OpensearchTaskHandler``,
+set ``REMOTE_TASK_LOG = OpensearchRemoteLogIO(...)`` at module scope in that
+module. See the OpenSearch provider's logging guide for the updated override
+example.
1.9.2
.....
diff --git a/providers/opensearch/docs/logging/index.rst
b/providers/opensearch/docs/logging/index.rst
index 34f2dc359b9..43f84921711 100644
--- a/providers/opensearch/docs/logging/index.rst
+++ b/providers/opensearch/docs/logging/index.rst
@@ -88,9 +88,7 @@ The wiring that registers ``OpensearchTaskHandler`` inside
the stock
``airflow_local_settings.py`` (the file that builds
``DEFAULT_LOGGING_CONFIG``) only landed
in Airflow **3.2.1**. On Airflow **3.0.0 – 3.2.0** installing the provider is
not enough:
to make the UI's log viewer fetch logs from OpenSearch you must ship a custom
logging
-config that swaps the ``task`` handler. The handler self-registers as the
remote-log
-reader on construction (via ``REMOTE_TASK_LOG`` on 3.0/3.1 and
``_ActiveLoggingConfig``
-on 3.2), so swapping the handler class is the only change required.
+config that swaps the ``task`` handler **and** sets ``REMOTE_TASK_LOG`` at
module scope.
Create a module on the Python path — for example
``config/airflow_local_settings.py`` —
and point Airflow at it via ``[logging] logging_config_class``:
@@ -102,9 +100,13 @@ and point Airflow at it via ``[logging]
logging_config_class``:
DEFAULT_LOGGING_CONFIG,
)
from airflow.providers.common.compat.sdk import conf
+ from airflow.providers.opensearch.log.os_task_handler import
OpensearchRemoteLogIO
OPENSEARCH_HOST = conf.get("opensearch", "host", fallback=None)
+ REMOTE_TASK_LOG = None
+ DEFAULT_REMOTE_CONN_ID = None
+
if OPENSEARCH_HOST:
DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
"class":
"airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler",
@@ -123,6 +125,25 @@ and point Airflow at it via ``[logging]
logging_config_class``:
"write_to_opensearch": conf.getboolean("opensearch",
"write_to_os", fallback=False),
"target_index": conf.get("opensearch", "target_index",
fallback="airflow-logs"),
}
+ REMOTE_TASK_LOG = OpensearchRemoteLogIO(
+ host=OPENSEARCH_HOST,
+ port=conf.getint("opensearch", "port", fallback=9200),
+ username=conf.get("opensearch", "username"),
+ password=conf.get("opensearch", "password"),
+ target_index=conf.get("opensearch", "target_index",
fallback="airflow-logs"),
+ write_stdout=conf.getboolean("opensearch", "write_stdout"),
+ write_to_opensearch=conf.getboolean("opensearch", "write_to_os",
fallback=False),
+ offset_field=conf.get("opensearch", "offset_field",
fallback="offset"),
+ host_field=conf.get("opensearch", "host_field", fallback="host"),
+ base_log_folder=str(BASE_LOG_FOLDER),
+ delete_local_copy=conf.getboolean("logging", "delete_local_logs"),
+ json_format=conf.getboolean("opensearch", "json_format"),
+ log_id_template=conf.get(
+ "opensearch",
+ "log_id_template",
+
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
+ ),
+ )
Then, in ``airflow.cfg``:
@@ -132,6 +153,15 @@ Then, in ``airflow.cfg``:
remote_logging = True
logging_config_class = config.airflow_local_settings.DEFAULT_LOGGING_CONFIG
+.. note::
+
+ Earlier versions of this guide relied on ``OpensearchTaskHandler``
self-registering
+ ``REMOTE_TASK_LOG`` from inside ``__init__`` when ``dictConfig``
instantiated it.
+ That implicit registration is now deprecated
(``AirflowProviderDeprecationWarning``)
+ and will be removed in a future provider release; define
``REMOTE_TASK_LOG`` at
+ module scope as shown above. See :ref:`write-logs-advanced` for the full
+ ``logging_config_class`` contract.
+
On Airflow **3.2.1+** this override is unnecessary — the stock
``airflow_local_settings.py``
already contains an ``elif OPENSEARCH_HOST:`` branch, so configuring the
``[opensearch]``
section in ``airflow.cfg`` is sufficient.
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index 45e1a3ef172..728c9c80986 100644
---
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -23,6 +23,7 @@ import logging
import os
import sys
import time
+import warnings
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime
@@ -38,6 +39,7 @@ from opensearchpy.exceptions import NotFoundError
from sqlalchemy import select
import airflow.logging_config as alc
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import DagRun
from airflow.providers.common.compat.module_loading import import_string
from airflow.providers.common.compat.sdk import AirflowException, conf
@@ -380,8 +382,31 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
from airflow.logging_config import _ActiveLoggingConfig,
get_remote_task_log
if get_remote_task_log() is None:
+ # stacklevel=1 keeps the warning attributed to this
airflow.providers
+ # module so module-based deprecation filters still match;
dictConfig
+ # is in stdlib and would otherwise hide the warning at
stacklevel=2.
+ warnings.warn(
+ "Implicit REMOTE_TASK_LOG registration by
OpensearchTaskHandler during "
+ "dictConfig is deprecated and will be removed in a
future provider "
+ "release. Set ``REMOTE_TASK_LOG =
OpensearchRemoteLogIO(...)`` at "
+ "module scope in your ``[logging]
logging_config_class`` module. See "
+ "the OpenSearch provider logging documentation for the
updated "
+ "override example.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
_ActiveLoggingConfig.set(self.io, None)
elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined]
+ warnings.warn(
+ "Implicit REMOTE_TASK_LOG registration by
OpensearchTaskHandler during "
+ "dictConfig is deprecated and will be removed in a future
provider "
+ "release. Set ``REMOTE_TASK_LOG =
OpensearchRemoteLogIO(...)`` at "
+ "module scope in your ``[logging] logging_config_class``
module. See "
+ "the OpenSearch provider logging documentation for the
updated "
+ "override example.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined]
def set_context(self, ti: TaskInstance, *, identifier: str | None = None)
-> None:
diff --git a/providers/opensearch/tests/unit/opensearch/log/conftest.py
b/providers/opensearch/tests/unit/opensearch/log/conftest.py
new file mode 100644
index 00000000000..5f68f22355e
--- /dev/null
+++ b/providers/opensearch/tests/unit/opensearch/log/conftest.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
+
+
[email protected](autouse=True)
+def _no_implicit_remote_task_log_warning(monkeypatch):
+ """
+ Suppress the deprecated implicit-registration warning during direct
handler construction.
+
+ Mirrors the recommended user pattern of defining ``REMOTE_TASK_LOG`` at
module
+ scope in the ``[logging] logging_config_class`` module, so
+ ``OpensearchTaskHandler.__init__`` does not emit
+ ``AirflowProviderDeprecationWarning`` (which the forbidden-warnings plugin
+ treats as a test failure).
+
+ On Airflow 2.x the handler does not run the registration branch at all, so
the
+ fixture is a no-op there (and ``_ActiveLoggingConfig`` does not exist).
+ On Airflow 3.0/3.1 the handler reads
``airflow.logging_config.REMOTE_TASK_LOG``.
+ On Airflow 3.2+ the handler reads ``_ActiveLoggingConfig.remote_task_log``.
+ """
+ if not AIRFLOW_V_3_0_PLUS:
+ return
+ if AIRFLOW_V_3_2_PLUS:
+ from airflow.logging_config import _ActiveLoggingConfig
+
+ # raising=False is required because remote_task_log is annotation-only
at
+ # class scope and may not be initialized in isolated test runs.
+ monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded",
True, raising=False)
+ monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(),
raising=False)
+ else:
+ import airflow.logging_config
+
+ monkeypatch.setattr(airflow.logging_config, "REMOTE_TASK_LOG",
object(), raising=False)