This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fdbb9b01a4f fix: pin compatible-with at the transport layer to keep ES
8 servers working (#66065)
fdbb9b01a4f is described below
commit fdbb9b01a4fff447ae558e04355f4ebba5d3ef99
Author: Peter <[email protected]>
AuthorDate: Tue May 19 19:21:12 2026 +0900
fix: pin compatible-with at the transport layer to keep ES 8 servers
working (#66065)
* fix(elasticsearch): pin compatible-with at the transport layer to keep ES
8 servers working
Since #64070 the provider depends on elasticsearch>=8.10,<10. A default
install resolves to an elasticsearch>=9 Python client, which always
negotiates 'compatible-with=9' on every request. Elasticsearch 8.x
servers reject that with HTTP 400 media_type_header_exception, breaking
remote task log ingestion and both ElasticsearchSQLHook and
ElasticsearchPythonHook against ES 8 clusters.
Add a [elasticsearch] es_compat_with config option that, when set to a
major version string ('7'/'8'/'9'), wraps the client's transport
perform_request so every outbound request carries
'Accept: application/vnd.elasticsearch+json; compatible-with=<major>'
(and the matching '+x-ndjson' form for bulk so streaming bodies still
parse). The wrap is applied at every Elasticsearch client construction
site in the provider:
- ElasticsearchTaskHandler (log/es_task_handler.py)
- ElasticsearchRemoteLogIO (log/es_task_handler.py)
- ESConnection (hooks/elasticsearch.py)
- ElasticsearchPythonHook (hooks/elasticsearch.py)
When the option is unset, behavior is unchanged.
Tests assert against what the transport actually sends, not the in-memory
state of the client object. Setting client._headers (which is what
client.options(headers=...) does) is not enough because elasticsearch-py
re-applies its own per-API-method content-negotiation headers right
before the request is sent — only the transport layer sees the final
headers.
Closes: https://github.com/apache/airflow/issues/66063
Supersedes: https://github.com/apache/airflow/pull/66064
* fixup(elasticsearch): tighten apply_compat_with — idempotent guard,
simpler ct check, correct test assertion
- idempotent guard: skip second wrap by checking transport.__dict__ for an
existing instance attribute. Repeated apply_compat_with calls (e.g.
hook reuse paths) are now true no-ops.
- content-type check simplified: '"ndjson" in ct' already matches both
'ndjson' and '+x-ndjson' so the redundant 'x-ndjson' branch is dropped.
- unset-case test was using 'transport.perform_request is original' which
would fail even when nothing was wrapped, because attribute access on a
bound method produces a fresh wrapper object every time. Switched to
inspecting transport.__dict__ for the 'perform_request' key, which
precisely tracks whether the helper installed an instance override.
- new test_apply_compat_with_is_idempotent asserts the guard above.
* fixup(elasticsearch): mirror upstream selective mimetype rewrite
The previous wrapper unconditionally overwrote the entire `Accept` header
to `application/vnd.elasticsearch+json; compatible-with=<major>` whenever
one was present. That is too aggressive: elasticsearch-py emits
non-JSON `Accept` values for several APIs that still need to flow through
the same transport. Notably:
- `client.cat.help()` sends `Accept: text/plain`.
- All other `client.cat.*` endpoints send `Accept:
text/plain,application/json`.
- Search-MVT endpoints send `Accept: application/vnd.mapbox-vector-tile`.
After the previous wrap every one of those calls went on the wire as
plain `application/vnd.elasticsearch+json; compatible-with=<N>`, silently
turning cat responses into JSON for any operator using
`ElasticsearchPythonHook.get_conn()` to call cat APIs.
Mirror upstream's own `mimetype_header_to_compat` instead: only
`application/(json|x-ndjson|vnd.mapbox-vector-tile)` parts of the header
get the `compatible-with=<configured>` suffix, anything else is left
verbatim. The regex also matches the already-rewritten
`application/vnd.elasticsearch+<x>; compatible-with=<N>` form that
elasticsearch-py 9.x ships before the transport sees the request, so the
configured major actually replaces the client default major on the wire
(verified with a Transport spy against elasticsearch-py 9.3.0).
Two adjacent hardenings while we are in here:
- Strip whitespace from the config value and reject anything that is not a
positive integer string with `AirflowConfigException` at construction
time, so a typo like `es_compat_with = 'v8'` fails fast in the worker
startup log instead of returning a 400 storm per request.
- Walk header keys case-insensitively, so a future `elastic_transport`
that forwards PascalCase `Accept` / `Content-Type` keys cannot silently
bypass the rewrite.
Tests: add wire-level cases for cat APIs (`text/plain` preserved,
`text/plain,application/...` partial rewrite), PascalCase headers,
whitespace stripping, non-numeric major rejection, and a direct
`conf.get -> None` branch (the existing parametrize folds into the
provider yaml default `""` via `conf_vars`).
* fixup(elasticsearch): import AirflowConfigException via common.compat.sdk
The rest of this module already routes airflow imports through the
`common.compat.sdk` shim (`conf` lives there), and the shim explicitly
exports `AirflowConfigException` so the same provider build can target
both Airflow 2 (`airflow.exceptions`) and Airflow 3
(`airflow.sdk.exceptions`).
Switch the new exception import to the same shim so we don't pin to
`airflow.exceptions` and silently break the Airflow 3 import path.
* fixup(elasticsearch): satisfy CI lint, mypy and project-structure tests
CI on the latest `main` merge surfaced four failures, all mechanical and
fixed in this commit:
1. **Static checks** (ruff / ruff-format / autogen):
- `_compat.py` — D205 docstring rule wants the summary line on its own
line, both for the module docstring and for `apply_compat_with`'s
docstring. Reformatted both.
- `hooks/elasticsearch.py` — collapsed the multi-line
`apply_compat_with(Elasticsearch(...))` call into a single line
(now under the line-length cap thanks to the basic_auth tuple sitting
inside the existing parens).
- `tests/.../test__compat.py` — collapsed two over-wrapped expressions
(`captured.append({...})` in the spy, and the
`assert wire_capture[-1][...] == "..."` in
`test_apply_compat_with_strips_whitespace_in_config`).
- `get_provider_info.py` — the autogenerated mirror of `provider.yaml`
was missing the new `es_compat_with` config option entry. Added it
with the same description / version_added / type / example / default
as the yaml.
2. **MyPy providers** (`Cannot assign to a method [method-assign]`):
- `transport.perform_request = perform_request` (instance-level
assignment) is rejected by mypy because elastic_transport's
`Transport.perform_request` is bound at the class. Switched to
`setattr(transport, "perform_request", perform_request)`, which
mypy accepts and which preserves the exact same runtime behaviour
(the idempotency guard at the top of the function still inspects
`transport.__dict__["perform_request"]`, so repeat calls remain
no-ops).
3. **Non-DB tests core** and **Low dep tests core**
(`test_project_structure.py::test_providers_modules_should_have_tests`):
- The structure check expects the test file for source `_compat.py`
(note the leading underscore) to be named `test__compat.py` (two
underscores: `test_` + `_compat`). Renamed the file from
`test_compat.py` → `test__compat.py` via `git mv` so the rest of
git history follows.
Re-validated locally:
- `ruff check` and `ruff format --check` pass on all four files.
- mypy on `_compat.py` no longer reports the `method-assign` error
(only an unrelated `airflow.__version__` attr-defined error from
running mypy outside a real Airflow install — Airflow CI runs against
an installed Airflow so this does not surface there).
- Wire-level regression matrix re-run with elasticsearch-py 9.3.0 and
the `setattr` variant: cat.help `text/plain` preserved, cat.indices
partial rewrite preserved, search/bulk Accept and Content-Type
rewritten to compat=8, idempotency guard still triggers, bad values
rejected. 7/7 PASS.
* Address review feedback: tolerant transport wrapper + clarified docs
- Refactor apply_compat_with to use functools.wraps + *args, **kwargs so
the wrapper survives future elastic_transport perform_request signature
changes (new keyword-only params, reordered positionals) and preserves
__name__/__doc__/__wrapped__ for introspection.
- Extend the es_compat_with docs entry with explicit valid-value rules
and a note that the fix is installed at the transport layer and
therefore overrides elasticsearch-py's per-API-method header
negotiation (constructor headers= does not work for this purpose).
* Add 'misconfiguration' to spelling wordlist
Used in providers/elasticsearch/docs/logging/index.rst to describe the
fail-fast behavior when [elasticsearch] es_compat_with is set to an
invalid value. The wordlist already contained 'misconfigured' but the
noun form was missing, causing the --spellcheck-only docs build to fail.
---------
Co-authored-by: Peter Cheon <[email protected]>
---
docs/spelling_wordlist.txt | 1 +
providers/elasticsearch/docs/changelog.rst | 14 ++
providers/elasticsearch/docs/logging/index.rst | 42 ++++
providers/elasticsearch/provider.yaml | 14 ++
.../src/airflow/providers/elasticsearch/_compat.py | 119 ++++++++++
.../providers/elasticsearch/get_provider_info.py | 7 +
.../providers/elasticsearch/hooks/elasticsearch.py | 7 +-
.../providers/elasticsearch/log/es_task_handler.py | 5 +-
.../tests/unit/elasticsearch/test__compat.py | 240 +++++++++++++++++++++
9 files changed, 444 insertions(+), 5 deletions(-)
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b4460b21e75..26662d523a3 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1031,6 +1031,7 @@ milli
millis
milton
minikube
+misconfiguration
misconfigured
Mixin
mixin
diff --git a/providers/elasticsearch/docs/changelog.rst
b/providers/elasticsearch/docs/changelog.rst
index 2209b6ff95b..3da8690238c 100644
--- a/providers/elasticsearch/docs/changelog.rst
+++ b/providers/elasticsearch/docs/changelog.rst
@@ -28,6 +28,20 @@ Changelog
---------
+A new ``[elasticsearch] es_compat_with`` config option lets operators pin
+the ``compatible-with`` HTTP content-negotiation level used by the
+Elasticsearch client. Since 6.5.1 the provider depends on
+``elasticsearch>=8.10,<10``, and a default install resolves to an
+``elasticsearch>=9`` client which unconditionally negotiates
+``compatible-with=9`` on every request. Elasticsearch 8.x servers reject
+that with HTTP 400 ``media_type_header_exception`` (regression introduced
+by #64070), breaking remote task log ingestion and the SQL/Python hooks
+against ES 8 clusters. Setting ``es_compat_with = "8"`` rewrites the
+client transport so every outbound request carries
+``compatible-with=8`` (and the matching ``+x-ndjson`` form for bulk
+requests), restoring compatibility without dropping ES 9 support. When
+unset, behavior is unchanged.
+
6.5.3
.....
diff --git a/providers/elasticsearch/docs/logging/index.rst
b/providers/elasticsearch/docs/logging/index.rst
index 3254bf5406c..5cfae8d6230 100644
--- a/providers/elasticsearch/docs/logging/index.rst
+++ b/providers/elasticsearch/docs/logging/index.rst
@@ -93,6 +93,48 @@ Additionally, in the ``elasticsearch_configs`` section, you
can pass any paramet
api_key = "SOMEAPIKEY"
verify_certs = True
+Pinning the ``compatible-with`` content-negotiation level
+'''''''''''''''''''''''''''''''''''''''''''''''''''''''''
+
+Since provider 6.5.1, the Elasticsearch dependency is
``elasticsearch>=8.10,<10``,
+which means a default install resolves to an ``elasticsearch>=9`` Python
client.
+That client unconditionally negotiates ``compatible-with=9`` on every request,
+which Elasticsearch 8.x servers reject with HTTP 400
+``media_type_header_exception``. Both the task log writer and the
+``ElasticsearchSQLHook`` / ``ElasticsearchPythonHook`` are affected.
+
+If you need to keep a single Airflow image compatible with an
+``elasticsearch<9`` server, set ``[elasticsearch] es_compat_with`` to the
server
+major version. The provider then rewrites the client transport so every
outbound
+request carries ``Accept`` / ``Content-Type:
+application/vnd.elasticsearch+json; compatible-with=<major>`` (and the matching
+``+x-ndjson`` form for bulk requests):
+
+.. code-block:: ini
+
+ [elasticsearch]
+ es_compat_with = 8
+
+Only a positive integer major version is accepted (``"7"``, ``"8"``, ``"9"``);
+any other value (e.g. ``"v8"``, ``"8.0"``) fails fast with an
+``AirflowConfigException`` at client construction time so the misconfiguration
+is obvious in the worker startup log instead of producing a per-request 400
+storm.
+
+.. note::
+
+ The fix is installed at the **transport layer** (a wrapper around
+ ``client.transport.perform_request``) and therefore overrides the
+ per-API-method ``Accept`` / ``Content-Type`` headers that elasticsearch-py
+ negotiates from its own client major. Constructor-level ``headers=`` on
+ ``Elasticsearch.__init__`` and the ``elasticsearch_configs`` section do
+ **not** work for this purpose — elasticsearch-py re-applies its own
+ ``compatible-with=<client_major>`` headers right before the request goes
+ out, after any constructor headers.
+
+When the option is unset the client behaves as before (negotiating its own
+major version).
+
.. _elasticsearch-document-schema:
Expected Elasticsearch document schema
diff --git a/providers/elasticsearch/provider.yaml
b/providers/elasticsearch/provider.yaml
index 5fff75d12ef..822580e58c8 100644
--- a/providers/elasticsearch/provider.yaml
+++ b/providers/elasticsearch/provider.yaml
@@ -221,6 +221,20 @@ config:
type: string
example: ~
default: "1000"
+ es_compat_with:
+ description: |
+ Pin the ``compatible-with`` HTTP content-negotiation level used by
the
+ Elasticsearch client. Accepts a server major version string (e.g.
``"7"``,
+ ``"8"``, ``"9"``). When unset, the elasticsearch-py client
negotiates its
+ own major version, which makes an ``elasticsearch>=9`` client (the
default
+ for fresh installs) incompatible with Elasticsearch 8.x servers —
every
+ request is rejected with HTTP 400 ``media_type_header_exception``.
+ Setting this option keeps a single Airflow image compatible with both
+ ``elasticsearch<9`` and ``elasticsearch>=9`` servers.
+ version_added: 6.5.4
+ type: string
+ example: "8"
+ default: ""
elasticsearch_configs:
description: ~
options:
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/_compat.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/_compat.py
new file mode 100644
index 00000000000..ca888db8a6f
--- /dev/null
+++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/_compat.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Helpers shared between the Elasticsearch hooks and log handler.
+
+Currently this exposes a single helper, :func:`apply_compat_with`, that lets
the
+provider keep working against an Elasticsearch server whose major version does
+not match the installed ``elasticsearch`` Python client major. See the helper
+docstring for the regression context.
+"""
+
+from __future__ import annotations
+
+import functools
+import re
+from typing import TYPE_CHECKING
+
+from airflow.providers.common.compat.sdk import AirflowConfigException, conf
+
+if TYPE_CHECKING:
+ import elasticsearch
+
+
+# Matches the JSON / NDJSON / mapbox vector-tile mimetypes the
``elasticsearch``
+# client negotiates, in either form: the raw ``application/json`` (what the
+# generated client code writes into ``__headers``) and the already-rewritten
+# ``application/vnd.elasticsearch+json; compatible-with=<N>`` (what
+# ``mimetype_header_to_compat`` in ``elasticsearch/_sync/client/_base.py``
+# emits before handing the request to the transport). Both forms must be
+# rewritten to ``compatible-with=<configured>``; anything else (notably
+# ``text/plain`` used by the cat APIs) is left intact, mirroring upstream's
+# selective substitution behaviour.
+_COMPAT_MIMETYPE_RE = re.compile(
+
r"application/(?:vnd\.elasticsearch\+)?(json|x-ndjson|vnd\.mapbox-vector-tile)"
+ r"(?:\s*;\s*compatible-with=\d+)?"
+)
+_COMPAT_MAJOR_RE = re.compile(r"^\d+$")
+
+
+def apply_compat_with(client: elasticsearch.Elasticsearch) ->
elasticsearch.Elasticsearch:
+ """
+ Pin the ``compatible-with`` HTTP content-negotiation level for ``client``.
+
+ The ``elasticsearch`` Python client always negotiates
``compatible-with=<client_major>``
+ on every request; an Elasticsearch server with a different major version
rejects
+ the request with HTTP 400 ``media_type_header_exception``. This is what
happens
+ when an ``elasticsearch>=9`` client (the current default) talks to an
+ Elasticsearch 8.x server, which broke remote logging for ES 8 deployments
+ starting with provider 6.5.1.
+
+ When ``[elasticsearch] es_compat_with`` is set to a major version string
+ (e.g. ``"7"``, ``"8"``, ``"9"``) this helper wraps the client's transport
+ so every outbound request rewrites the ``compatible-with=<N>`` parameter on
+ the JSON / NDJSON / mapbox vector-tile parts of the ``Accept`` and
+ ``Content-Type`` headers. Non-JSON parts (notably ``text/plain`` used by
+ the cat APIs) are preserved verbatim, mirroring how elasticsearch-py's own
+ ``mimetype_header_to_compat`` handles content negotiation.
+
+ When the option is unset the client is returned unchanged and behaves
+ exactly as before.
+ """
+ raw = conf.get("elasticsearch", "es_compat_with", fallback=None)
+ compat = (raw or "").strip()
+ if not compat:
+ return client
+ if not _COMPAT_MAJOR_RE.match(compat):
+ raise AirflowConfigException(
+ "[elasticsearch] es_compat_with must be a positive integer major
version "
+ f"(e.g. '7', '8', '9'); got {raw!r}."
+ )
+
+ transport = client.transport
+ if "perform_request" in transport.__dict__:
+ # Already wrapped on this transport instance — no-op so repeated calls
+ # to ``apply_compat_with`` (e.g. across hook reuse) stay idempotent.
+ return client
+
+ sub = rf"application/vnd.elasticsearch+\g<1>; compatible-with={compat}"
+ original_perform_request = transport.perform_request
+
+ # Accept ``*args, **kwargs`` so the wrapper survives future
elastic_transport
+ # ``perform_request`` signature changes (new keyword-only params, reordered
+ # positionals). ``functools.wraps`` preserves the original ``__name__`` /
+ # ``__doc__`` / ``__wrapped__`` for tooling and introspection.
+ @functools.wraps(original_perform_request)
+ def perform_request(*args, **kwargs): # type: ignore[no-untyped-def]
+ headers = kwargs.get("headers")
+ if not headers:
+ return original_perform_request(*args, **kwargs)
+ # Walk every key case-insensitively so a future elastic_transport that
+ # forwards PascalCase headers does not silently bypass the rewrite.
+ merged = dict(headers)
+ for key in list(merged):
+ if key.lower() in ("accept", "content-type") and merged[key]:
+ merged[key] = _COMPAT_MIMETYPE_RE.sub(sub, merged[key])
+ kwargs["headers"] = merged
+ return original_perform_request(*args, **kwargs)
+
+ # ``setattr`` instead of direct attribute assignment so mypy does not flag
a
+ # ``method-assign`` error — we are *intentionally* shadowing the bound
method
+ # at the instance level (the idempotency guard above checks the instance
+ # ``__dict__``).
+ setattr(transport, "perform_request", perform_request)
+ return client
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
index 2d357cecb19..b0853a98580 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
@@ -151,6 +151,13 @@ def get_provider_info():
"example": None,
"default": "1000",
},
+ "es_compat_with": {
+ "description": 'Pin the ``compatible-with`` HTTP
content-negotiation level used by the\nElasticsearch client. Accepts a server
major version string (e.g. ``"7"``,\n``"8"``, ``"9"``). When unset, the
elasticsearch-py client negotiates its\nown major version, which makes an
``elasticsearch>=9`` client (the default\nfor fresh installs) incompatible with
Elasticsearch 8.x servers — every\nrequest is rejected with HTTP 400
``media_type_header_exception``.\nSetting this [...]
+ "version_added": "6.5.4",
+ "type": "string",
+ "example": "8",
+ "default": "",
+ },
},
},
"elasticsearch_configs": {
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 4330740fd96..fa4895b37e4 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -28,6 +28,7 @@ from elasticsearch import Elasticsearch
from airflow.providers.common.compat.sdk import BaseHook
from airflow.providers.common.sql.hooks.sql import DbApiHook
+from airflow.providers.elasticsearch._compat import apply_compat_with
if TYPE_CHECKING:
from elastic_transport import ObjectApiResponse
@@ -171,9 +172,9 @@ class ESConnection:
netloc = f"{host}:{port}"
self.url = parse.urlunparse((scheme, netloc, "/", None, None, None))
if user and password:
- self.es = Elasticsearch(self.url, basic_auth=(user, password),
**kwargs)
+ self.es = apply_compat_with(Elasticsearch(self.url,
basic_auth=(user, password), **kwargs))
else:
- self.es = Elasticsearch(self.url, **kwargs)
+ self.es = apply_compat_with(Elasticsearch(self.url, **kwargs))
def cursor(self) -> ElasticsearchSQLCursor:
return ElasticsearchSQLCursor(self.es, **self.kwargs)
@@ -283,7 +284,7 @@ class ElasticsearchPythonHook(BaseHook):
def _get_elastic_connection(self):
"""Return the Elasticsearch client."""
- client = Elasticsearch(self.hosts, **self.es_conn_args)
+ client = apply_compat_with(Elasticsearch(self.hosts,
**self.es_conn_args))
return client
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index eb531452df8..402d64e5bf1 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -43,6 +43,7 @@ from elasticsearch.exceptions import NotFoundError
import airflow.logging_config as alc
from airflow.models.dagrun import DagRun
from airflow.providers.common.compat.sdk import conf
+from airflow.providers.elasticsearch._compat import apply_compat_with
from airflow.providers.elasticsearch.log.es_json_formatter import
ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse, Hit, resolve_nested
from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
@@ -269,7 +270,7 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
)
self.closed = False
- self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+ self.client = apply_compat_with(elasticsearch.Elasticsearch(self.host,
**es_kwargs))
# in airflow.cfg, host of elasticsearch has to be
http://dockerhostXxxx:9200
self.frontend = frontend
@@ -651,7 +652,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
def __attrs_post_init__(self):
es_kwargs = get_es_kwargs_from_config()
- self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+ self.client = apply_compat_with(elasticsearch.Elasticsearch(self.host,
**es_kwargs))
self.index_patterns_callable = conf.get("elasticsearch",
"index_patterns_callable", fallback="")
self.PAGE = 0
self.MAX_LINE_PER_PAGE = conf.getint("elasticsearch",
"max_lines_per_page", fallback=1000)
diff --git a/providers/elasticsearch/tests/unit/elasticsearch/test__compat.py
b/providers/elasticsearch/tests/unit/elasticsearch/test__compat.py
new file mode 100644
index 00000000000..7f81f9b92f0
--- /dev/null
+++ b/providers/elasticsearch/tests/unit/elasticsearch/test__compat.py
@@ -0,0 +1,240 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Wire-level tests for
``airflow.providers.elasticsearch._compat.apply_compat_with``.
+
+These tests intercept ``elastic_transport.Transport.perform_request`` so they
+observe the exact ``Accept`` / ``Content-Type`` headers the helper produces.
+Asserting on ``client._headers`` (or any other in-memory state on the client)
+would not be enough: the elasticsearch-py client always re-applies its own
+per-API-method content-negotiation headers right before the request goes out,
+which is the very behaviour this helper has to override. Only what the
+``Transport`` sees on the wire matters.
+"""
+
+from __future__ import annotations
+
+import contextlib
+
+import pytest
+from elastic_transport import Transport
+from elasticsearch import Elasticsearch
+
+from airflow.providers.common.compat.sdk import AirflowConfigException
+from airflow.providers.elasticsearch._compat import apply_compat_with
+
+from tests_common.test_utils.config import conf_vars
+
+
+def _trigger_calls(client: Elasticsearch) -> None:
+ """Drive ``search``, ``info`` and ``bulk`` against the spy transport.
+
+ Each call hits the spy and raises; we swallow that and rely on the spy's
+ ``captured`` list to make assertions.
+ """
+ for action in (
+ lambda: client.search(index="airflow-logs", query={"match_all": {}}),
+ lambda: client.info(),
+ lambda: client.bulk(operations=[{"index": {"_index": "x"}}, {"hello":
"world"}]),
+ ):
+ with contextlib.suppress(RuntimeError):
+ action()
+
+
[email protected]
+def wire_capture(monkeypatch):
+ """Replace ``Transport.perform_request`` with a recording spy.
+
+ The spy is installed at the *class* level (not on an instance) so it is
+ picked up by both the original transport and by the wrapper produced by
+ :func:`apply_compat_with`. The wrapper resolves the original
+ ``perform_request`` at wrap time, so the spy must already be in place when
+ ``apply_compat_with`` runs.
+ """
+ captured: list[dict] = []
+
+ def spy(self, method, target, *, body=None, headers=None, **kwargs):
+ captured.append({"method": method, "target": target, "headers":
dict(headers or {})})
+ raise RuntimeError("captured")
+
+ monkeypatch.setattr(Transport, "perform_request", spy)
+ return captured
+
+
[email protected]("unset_value", ["", None])
+def test_apply_compat_with_unset_does_not_wrap_transport(unset_value):
+ """When the option is unset the helper returns the client untouched.
+
+ The wrap installs ``perform_request`` as an instance attribute on the
+ transport, so the most precise way to assert the helper is a no-op is to
+ check that no such instance attribute was set (i.e. lookup still resolves
+ to the class method). Identity (``is``) comparison on
+ ``transport.perform_request`` would not work even in the no-op case
+ because attribute access on a bound method produces a fresh wrapper each
+ time.
+
+ Note: ``conf_vars`` removes the override when the value is ``None``, so
+ both parametrized cases ultimately resolve to the provider yaml default
+ (``""``). The parametrize is kept to document that callers passing either
+ sentinel get the no-op path; the actual ``None`` branch in the helper is
+ covered by ``test_apply_compat_with_unset_via_missing_conf`` below.
+ """
+ client = Elasticsearch("http://localhost:9200")
+ assert "perform_request" not in client.transport.__dict__
+ with conf_vars({("elasticsearch", "es_compat_with"): unset_value}):
+ same = apply_compat_with(client)
+ assert same is client
+ assert "perform_request" not in client.transport.__dict__
+
+
+def test_apply_compat_with_unset_via_missing_conf(monkeypatch):
+ """Cover the ``conf.get`` returning ``None`` branch directly."""
+ from airflow.providers.elasticsearch import _compat
+
+ monkeypatch.setattr(_compat.conf, "get", lambda *args, **kwargs: None)
+ client = Elasticsearch("http://localhost:9200")
+ assert apply_compat_with(client) is client
+ assert "perform_request" not in client.transport.__dict__
+
+
+def test_apply_compat_with_pins_compatible_with_8(wire_capture):
+ """With ``es_compat_with = "8"`` every outbound call carries
``compatible-with=8``."""
+ with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+ client = apply_compat_with(Elasticsearch("http://localhost:9200"))
+
+ _trigger_calls(client)
+
+ assert {c["method"] for c in wire_capture} == {"POST", "GET", "PUT"}
+ expected_json = "application/vnd.elasticsearch+json; compatible-with=8"
+ expected_ndjson = "application/vnd.elasticsearch+x-ndjson;
compatible-with=8"
+
+ by_method = {c["method"]: c for c in wire_capture}
+ assert by_method["POST"]["headers"]["accept"] == expected_json
+ assert by_method["POST"]["headers"]["content-type"] == expected_json
+ assert by_method["GET"]["headers"]["accept"] == expected_json
+ # ``info()`` does not send a body, so content-type is absent on the wire
+ assert by_method["GET"]["headers"].get("content-type") in (None, "")
+ assert by_method["PUT"]["headers"]["accept"] == expected_json
+ # bulk preserves the ndjson form so the server can stream the body
+ assert by_method["PUT"]["headers"]["content-type"] == expected_ndjson
+
+
+def test_apply_compat_with_pins_compatible_with_7(wire_capture):
+ """The helper accepts arbitrary major version strings, not just ``"8"``."""
+ with conf_vars({("elasticsearch", "es_compat_with"): "7"}):
+ client = apply_compat_with(Elasticsearch("http://localhost:9200"))
+
+ _trigger_calls(client)
+ assert wire_capture, "spy should have captured at least one call"
+ assert all(
+ c["headers"]["accept"] == "application/vnd.elasticsearch+json;
compatible-with=7"
+ for c in wire_capture
+ )
+
+
+def test_apply_compat_with_preserves_text_plain_for_cat_apis(wire_capture):
+ """Cat APIs send ``Accept: text/plain[,application/json]``; the wrapper
must
+ preserve the ``text/plain`` part. Earlier revisions of the helper
unconditionally
+ rewrote ``accept`` to ``application/vnd.elasticsearch+json;
compatible-with=N``,
+ which silently turned every ``cat.*`` response into JSON instead of plain
text.
+
+ We mirror elasticsearch-py's own ``mimetype_header_to_compat`` (only
+ ``application/(json|x-ndjson|vnd.mapbox-vector-tile)`` parts get the
+ ``compatible-with`` suffix), so this test fails fast if anyone reverts to
the
+ blanket overwrite.
+ """
+ with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+ client = apply_compat_with(Elasticsearch("http://localhost:9200"))
+
+ for action in (lambda: client.cat.help(), lambda: client.cat.indices()):
+ with contextlib.suppress(RuntimeError):
+ action()
+
+ accepts = [c["headers"].get("accept") for c in wire_capture]
+ # ``cat.help`` ships ``text/plain`` only; it must come through verbatim.
+ assert "text/plain" in accepts, accepts
+ # ``cat.indices`` ships ``text/plain,application/json``; the JSON half gets
+ # the ``compatible-with=8`` suffix, the text/plain half stays put.
+ assert any(
+ a and a.startswith("text/plain,application/vnd.elasticsearch+json;
compatible-with=8")
+ for a in accepts
+ ), accepts
+
+
+def test_apply_compat_with_handles_pascal_case_headers(monkeypatch):
+ """Defensive: if ``elastic_transport`` ever forwards PascalCase header
keys,
+ the rewrite must still apply (a lowercase-only ``dict.get`` would silently
+ no-op and let ``compatible-with=<client_major>`` ship to the server).
+ """
+ seen: dict = {}
+
+ def spy(self, method, target, *, body=None, headers=None, **kwargs):
+ seen["headers"] = dict(headers or {})
+ raise RuntimeError("captured")
+
+ monkeypatch.setattr(Transport, "perform_request", spy)
+
+ with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+ client = apply_compat_with(Elasticsearch("http://localhost:9200"))
+
+ # Drive the wrapper with PascalCase keys directly — bypassing the
+ # ``_BaseClient.perform_request`` normalization.
+ with contextlib.suppress(RuntimeError):
+ client.transport.perform_request(
+ "GET",
+ "/",
+ headers={"Accept": "application/json", "Content-Type":
"application/json"},
+ )
+
+ assert seen["headers"]["Accept"] == "application/vnd.elasticsearch+json;
compatible-with=8"
+ assert seen["headers"]["Content-Type"] ==
"application/vnd.elasticsearch+json; compatible-with=8"
+
+
+def test_apply_compat_with_strips_whitespace_in_config(wire_capture):
+ """Operators occasionally write ``es_compat_with = " 8"``; the helper must
+ strip whitespace before interpolating into the wire header, otherwise the
+ server returns 400 and the helper fails open in a confusing way.
+ """
+ with conf_vars({("elasticsearch", "es_compat_with"): " 8 "}):
+ client = apply_compat_with(Elasticsearch("http://localhost:9200"))
+
+ with contextlib.suppress(RuntimeError):
+ client.search(index="airflow-logs", query={"match_all": {}})
+
+ assert wire_capture[-1]["headers"]["accept"] ==
"application/vnd.elasticsearch+json; compatible-with=8"
+
+
[email protected]("bad_value", ["v8", "8.0", "abc", "8;9"])
+def test_apply_compat_with_rejects_non_numeric_major(bad_value):
+ """A non-numeric ``es_compat_with`` would otherwise produce malformed wire
+ headers (``compatible-with=v8``) and a per-request 400 storm. Fail fast at
+ construction time with a config exception so the misconfiguration is
+ obvious in the worker startup log.
+ """
+ with conf_vars({("elasticsearch", "es_compat_with"): bad_value}):
+ with pytest.raises(AirflowConfigException, match="es_compat_with"):
+ apply_compat_with(Elasticsearch("http://localhost:9200"))
+
+
+def test_apply_compat_with_is_idempotent():
+ """Calling ``apply_compat_with`` twice on the same client only wraps
once."""
+ with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+ client = apply_compat_with(Elasticsearch("http://localhost:9200"))
+ first_wrapper = client.transport.__dict__["perform_request"]
+ apply_compat_with(client)
+ second_wrapper = client.transport.__dict__["perform_request"]
+ assert first_wrapper is second_wrapper