This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0dd171f09aa Implement fetchmany support for ElasticsearchSQLCursor
using an internal row buffer. (#66658)
0dd171f09aa is described below
commit 0dd171f09aac43e6c5b3ffd0f8487551c9e1048c
Author: SameerMesiah97 <[email protected]>
AuthorDate: Tue May 12 23:01:05 2026 +0100
Implement fetchmany support for ElasticsearchSQLCursor using an internal
row buffer. (#66658)
Refactor cursor pagination semantics to progressively consume rows across
SQL cursor pages.
Add unit tests covering fetchmany batching, row exhaustion, default fetch
size, and paginated fetchall behavior.
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../providers/elasticsearch/hooks/elasticsearch.py | 58 +++++++++++---
.../unit/elasticsearch/hooks/test_elasticsearch.py | 92 ++++++++++++++++++++--
2 files changed, 131 insertions(+), 19 deletions(-)
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
index e8f0cfad00c..4330740fd96 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from collections import deque
from collections.abc import Iterable, Mapping
from copy import deepcopy
from functools import cached_property
@@ -56,6 +57,10 @@ class ElasticsearchSQLCursor:
}
self._response: ObjectApiResponse | None = None
+ # Internal mutable row buffer used to progressively consume
+ # paginated Elasticsearch SQL cursor results.
+ self._rows: deque[list[Any]] = deque()
+
@property
def response(self) -> ObjectApiResponse:
return self._response or {} # type: ignore
@@ -70,11 +75,11 @@ class ElasticsearchSQLCursor:
@property
def rows(self):
- return self.response.get("rows", [])
+ return self._rows
@property
def rowcount(self) -> int:
- return len(self.rows)
+ return len(self.response.get("rows", []))
@property
def description(self) -> list[tuple]:
@@ -83,26 +88,57 @@ class ElasticsearchSQLCursor:
def execute(
self, statement: str, params: Iterable | Mapping[str, Any] | None =
None
) -> ObjectApiResponse:
- self.body["query"] = statement
- if params:
- self.body["params"] = params
- self.response = self.es.sql.query(**self.body)
+
+ if self.body.get("cursor"):
+ self.response = self.es.sql.query(cursor=self.body["cursor"])
+ else:
+ self.body["query"] = statement
+
+ if params:
+ self.body["params"] = params
+
+ self.response = self.es.sql.query(**self.body)
+
+ self._rows = deque(self.response.get("rows", []))
+
if self.cursor:
self.body["cursor"] = self.cursor
else:
self.body.pop("cursor", None)
+
return self.response
def fetchone(self):
- if self.rows:
- return self.rows[0]
- return None
+ while True:
+ if self._rows:
+ return self._rows.popleft()
+
+ if not self.cursor:
+ return None
+
+ self.execute(statement=self.body["query"])
def fetchmany(self, size: int | None = None):
- raise NotImplementedError()
+ size = size or self.body["fetch_size"]
+
+ results: list[list[Any]] = []
+
+ while len(results) < size:
+ while self._rows and len(results) < size:
+ results.append(self._rows.popleft())
+
+ if len(results) >= size:
+ break
+
+ if not self.cursor:
+ break
+
+ self.execute(statement=self.body["query"])
+
+ return results
def fetchall(self):
- results = self.rows
+ results = list(self.rows)
while self.cursor:
self.execute(statement=self.body["query"])
results.extend(self.rows)
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
index ed9056ae350..c770cda5c96 100644
---
a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
+++
b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
@@ -34,13 +34,30 @@ from airflow.providers.elasticsearch.hooks.elasticsearch
import (
ESConnection,
)
-ROWS = [
+ROWS_PAGE_1 = [
[1, "Stallone", "Sylvester", "78"],
[2, "Statham", "Jason", "57"],
+]
+
+ROWS_PAGE_2 = [
[3, "Li", "Jet", "61"],
[4, "Lundgren", "Dolph", "66"],
[5, "Norris", "Chuck", "84"],
]
+
+ROWS = ROWS_PAGE_1 + ROWS_PAGE_2
+
+RESPONSE = {
+ "columns": [
+ {"name": "index", "type": "long"},
+ {"name": "name", "type": "text"},
+ {"name": "firstname", "type": "text"},
+ {"name": "age", "type": "long"},
+ ],
+ "rows": ROWS_PAGE_1,
+ "cursor": "e7f8QwXUruW2mIebzudH4BwAA//8DAA==",
+}
+
RESPONSE_WITHOUT_CURSOR = {
"columns": [
{"name": "index", "type": "long"},
@@ -48,9 +65,9 @@ RESPONSE_WITHOUT_CURSOR = {
{"name": "firstname", "type": "text"},
{"name": "age", "type": "long"},
],
- "rows": ROWS,
+ "rows": ROWS_PAGE_2,
}
-RESPONSE = {**RESPONSE_WITHOUT_CURSOR, **{"cursor":
"e7f8QwXUruW2mIebzudH4BwAA//8DAA=="}}
+
RESPONSES = [
RESPONSE,
RESPONSE_WITHOUT_CURSOR,
@@ -90,7 +107,7 @@ class TestElasticsearchSQLCursor:
cursor = ElasticsearchSQLCursor(es=self.es, options={})
cursor.execute("SELECT * FROM hollywood.actors")
- assert cursor.rowcount == len(ROWS)
+ assert cursor.rowcount == len(ROWS_PAGE_1)
def test_description(self):
cursor = ElasticsearchSQLCursor(es=self.es, options={})
@@ -109,12 +126,59 @@ class TestElasticsearchSQLCursor:
assert cursor.fetchone() == ROWS[0]
- def test_fetchmany(self):
+ @pytest.mark.parametrize(
+ ("size", "expected"),
+ [
+ (1, ROWS[:1]),
+ (2, ROWS[:2]),
+ (5, ROWS[:5]),
+ ],
+ )
+ def test_fetchmany(self, size, expected):
cursor = ElasticsearchSQLCursor(es=self.es, options={})
cursor.execute("SELECT * FROM hollywood.actors")
- with pytest.raises(NotImplementedError):
- cursor.fetchmany()
+ records = cursor.fetchmany(size)
+
+ assert records == expected
+
+ def test_fetchmany_consumes_rows(self):
+ cursor = ElasticsearchSQLCursor(es=self.es, options={})
+ cursor.execute("SELECT * FROM hollywood.actors")
+
+ first_batch = cursor.fetchmany(2)
+ second_batch = cursor.fetchmany(2)
+
+ assert first_batch == ROWS[:2]
+ assert second_batch == ROWS[2:4]
+
+ def test_fetchmany_exhausts_rows(self):
+ cursor = ElasticsearchSQLCursor(es=self.es, options={})
+ cursor.execute("SELECT * FROM hollywood.actors")
+
+ records = cursor.fetchmany(100)
+
+ assert records == ROWS
+
+ # Further calls should return empty list.
+ assert cursor.fetchmany(10) == []
+
+ def test_fetchmany_uses_default_fetch_size(self):
+ cursor = ElasticsearchSQLCursor(es=self.es, fetch_size=2)
+ cursor.execute("SELECT * FROM hollywood.actors")
+
+ records = cursor.fetchmany()
+
+ assert records == ROWS[:2]
+
+ def test_fetchmany_single_page(self):
+ self.es.sql.query.side_effect = None
+ self.es.sql.query.return_value = RESPONSE_WITHOUT_CURSOR
+ cursor = ElasticsearchSQLCursor(es=self.es, options={})
+ cursor.execute("SELECT * FROM hollywood.actors")
+
+ assert cursor.fetchmany(100) == ROWS_PAGE_2
+ assert cursor.fetchmany(10) == []
def test_fetchall(self):
cursor = ElasticsearchSQLCursor(es=self.es, options={})
@@ -122,9 +186,21 @@ class TestElasticsearchSQLCursor:
records = cursor.fetchall()
- assert len(records) == 10
+ assert len(records) == 5
assert records == ROWS
+ def test_fetchall_after_partial_fetchmany(self):
+ cursor = ElasticsearchSQLCursor(es=self.es, options={})
+ cursor.execute("SELECT * FROM hollywood.actors")
+
+ first_batch = cursor.fetchmany(2)
+
+ assert first_batch == ROWS[:2]
+
+ remaining_records = cursor.fetchall()
+
+ assert remaining_records == ROWS[2:]
+
class TestElasticsearchSQLHook:
def setup_method(self):