This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6677ea68f4 [Improve][Connector-V2][Elasticsearch] Enhance Scroll API
handling with resource cleanup (#10124)
6677ea68f4 is described below
commit 6677ea68f4bc1e1ff70fb568702f886d9a91abf1
Author: Jast <[email protected]>
AuthorDate: Sun Jan 4 21:20:05 2026 +0800
[Improve][Connector-V2][Elasticsearch] Enhance Scroll API handling with
resource cleanup (#10124)
---
.../elasticsearch/client/EsRestClient.java | 43 ++++++++++++++++++++++
.../source/ElasticsearchSourceReader.java | 39 ++++++++++++++------
.../connector/elasticsearch/ElasticsearchIT.java | 31 ++++++++++++++++
3 files changed, 101 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index e77efbb5dd..950d1634e1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -264,6 +264,49 @@ public class EsRestClient implements Closeable {
return getDocsFromSqlResult(endpoint, JsonUtils.toJsonString(param),
columnNodes);
}
+ /**
+ * Clear scroll context to release server-side resources.
+ *
+ * @param scrollId The scroll ID to clear
+ * @return True if the scroll was successfully cleared
+ */
+ public boolean clearScroll(String scrollId) {
+ if (StringUtils.isEmpty(scrollId)) {
+ log.warn("Attempted to clear scroll with empty scroll ID");
+ return false;
+ }
+
+ String endpoint = "/_search/scroll";
+ Request request = new Request("DELETE", endpoint);
+ Map<String, String> requestBody = new HashMap<>();
+ requestBody.put("scroll_id", scrollId);
+ request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ log.warn("DELETE {} response null for scroll ID: {}",
endpoint, scrollId);
+ return false;
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode jsonNode = JsonUtils.parseObject(entity);
+ boolean succeeded = jsonNode.get("succeeded").asBoolean();
+ return succeeded;
+ } else {
+ log.warn(
+ "DELETE {} response status code={} for scroll ID: {}",
+ endpoint,
+ response.getStatusLine().getStatusCode(),
+ scrollId);
+ return false;
+ }
+ } catch (Exception ex) {
+ log.warn("Failed to clear scroll ID: " + scrollId, ex);
+ return false;
+ }
+ }
+
private ScrollResult getDocsFromSqlResult(
String endpoint, String requestBody, JsonNode columnNodes) {
Request request = new Request("POST", endpoint);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index a1548a8ecc..e7000881ae 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -120,19 +120,34 @@ public class ElasticsearchSourceReader
searchWithPointInTime(sourceIndexInfo, output, deserializer);
} else {
log.info("Using Scroll API for index: {}",
sourceIndexInfo.getIndex());
- ScrollResult scrollResult =
- esRestClient.searchByScroll(
- sourceIndexInfo.getIndex(),
- sourceIndexInfo.getSource(),
- sourceIndexInfo.getQuery(),
- sourceIndexInfo.getScrollTime(),
- sourceIndexInfo.getScrollSize());
- outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
- while (scrollResult.getDocs() != null &&
!scrollResult.getDocs().isEmpty()) {
- scrollResult =
- esRestClient.searchWithScrollId(
- scrollResult.getScrollId(),
sourceIndexInfo.getScrollTime());
+ String scrollId = null;
+ try {
+ ScrollResult scrollResult =
+ esRestClient.searchByScroll(
+ sourceIndexInfo.getIndex(),
+ sourceIndexInfo.getSource(),
+ sourceIndexInfo.getQuery(),
+ sourceIndexInfo.getScrollTime(),
+ sourceIndexInfo.getScrollSize());
+ scrollId = scrollResult.getScrollId();
+
outputFromScrollResult(scrollResult, sourceIndexInfo,
output, deserializer);
+ while (scrollResult.getDocs() != null &&
!scrollResult.getDocs().isEmpty()) {
+ scrollResult =
+ esRestClient.searchWithScrollId(
+ scrollResult.getScrollId(),
+ sourceIndexInfo.getScrollTime());
+ scrollId = scrollResult.getScrollId();
+ outputFromScrollResult(scrollResult, sourceIndexInfo,
output, deserializer);
+ }
+ } finally {
+ if (StringUtils.isNotEmpty(scrollId)) {
+ try {
+ esRestClient.clearScroll(scrollId);
+ } catch (Exception e) {
+ log.warn("Failed to clear scroll ID: " + scrollId,
e);
+ }
+ }
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 2479be67b0..d18aa4d588 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -1083,6 +1083,37 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
return data;
}
+ @Test
+ public void testScrollAndSqlCursorResourceCleanup() throws Exception {
+
+ String scrollId = null;
+ try {
+ List<String> source = Arrays.asList("c_string", "c_int");
+ Map<String, Object> query = new HashMap<>();
+ query.put("match_all", Collections.emptyMap());
+
+ ScrollResult result = esRestClient.searchByScroll("st_index",
source, query, "1m", 5);
+ scrollId = result.getScrollId();
+ Assertions.assertNotNull(scrollId, "Scroll ID should not be null");
+
+ int totalDocs = result.getDocs().size();
+ while (result.getDocs() != null && !result.getDocs().isEmpty()) {
+ result = esRestClient.searchWithScrollId(scrollId, "1m");
+ scrollId = result.getScrollId();
+ if (result.getDocs() != null) {
+ totalDocs += result.getDocs().size();
+ }
+ }
+ log.info("Retrieved {} documents via Scroll API", totalDocs);
+
+ } finally {
+ if (scrollId != null) {
+ boolean cleaned = esRestClient.clearScroll(scrollId);
+ Assertions.assertTrue(cleaned, "Scroll context should be
successfully cleaned up");
+ }
+ }
+ }
+
/**
* elastic query all dsl
*