This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6677ea68f4 [Improve][Connector-V2][Elasticsearch] Enhance Scroll API 
handling with resource cleanup  (#10124)
6677ea68f4 is described below

commit 6677ea68f4bc1e1ff70fb568702f886d9a91abf1
Author: Jast <[email protected]>
AuthorDate: Sun Jan 4 21:20:05 2026 +0800

    [Improve][Connector-V2][Elasticsearch] Enhance Scroll API handling with 
resource cleanup  (#10124)
---
 .../elasticsearch/client/EsRestClient.java         | 43 ++++++++++++++++++++++
 .../source/ElasticsearchSourceReader.java          | 39 ++++++++++++++------
 .../connector/elasticsearch/ElasticsearchIT.java   | 31 ++++++++++++++++
 3 files changed, 101 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index e77efbb5dd..950d1634e1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -264,6 +264,49 @@ public class EsRestClient implements Closeable {
         return getDocsFromSqlResult(endpoint, JsonUtils.toJsonString(param), 
columnNodes);
     }
 
+    /**
+     * Clear scroll context to release server-side resources.
+     *
+     * @param scrollId The scroll ID to clear
+     * @return True if the scroll was successfully cleared
+     */
+    public boolean clearScroll(String scrollId) {
+        if (StringUtils.isEmpty(scrollId)) {
+            log.warn("Attempted to clear scroll with empty scroll ID");
+            return false;
+        }
+
+        String endpoint = "/_search/scroll";
+        Request request = new Request("DELETE", endpoint);
+        Map<String, String> requestBody = new HashMap<>();
+        requestBody.put("scroll_id", scrollId);
+        request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                log.warn("DELETE {} response null for scroll ID: {}", 
endpoint, scrollId);
+                return false;
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String entity = EntityUtils.toString(response.getEntity());
+                JsonNode jsonNode = JsonUtils.parseObject(entity);
+                boolean succeeded = jsonNode.get("succeeded").asBoolean();
+                return succeeded;
+            } else {
+                log.warn(
+                        "DELETE {} response status code={} for scroll ID: {}",
+                        endpoint,
+                        response.getStatusLine().getStatusCode(),
+                        scrollId);
+                return false;
+            }
+        } catch (Exception ex) {
+            log.warn("Failed to clear scroll ID: " + scrollId, ex);
+            return false;
+        }
+    }
+
     private ScrollResult getDocsFromSqlResult(
             String endpoint, String requestBody, JsonNode columnNodes) {
         Request request = new Request("POST", endpoint);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index a1548a8ecc..e7000881ae 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -120,19 +120,34 @@ public class ElasticsearchSourceReader
                 searchWithPointInTime(sourceIndexInfo, output, deserializer);
             } else {
                 log.info("Using Scroll API for index: {}", 
sourceIndexInfo.getIndex());
-                ScrollResult scrollResult =
-                        esRestClient.searchByScroll(
-                                sourceIndexInfo.getIndex(),
-                                sourceIndexInfo.getSource(),
-                                sourceIndexInfo.getQuery(),
-                                sourceIndexInfo.getScrollTime(),
-                                sourceIndexInfo.getScrollSize());
-                outputFromScrollResult(scrollResult, sourceIndexInfo, output, 
deserializer);
-                while (scrollResult.getDocs() != null && 
!scrollResult.getDocs().isEmpty()) {
-                    scrollResult =
-                            esRestClient.searchWithScrollId(
-                                    scrollResult.getScrollId(), 
sourceIndexInfo.getScrollTime());
+                String scrollId = null;
+                try {
+                    ScrollResult scrollResult =
+                            esRestClient.searchByScroll(
+                                    sourceIndexInfo.getIndex(),
+                                    sourceIndexInfo.getSource(),
+                                    sourceIndexInfo.getQuery(),
+                                    sourceIndexInfo.getScrollTime(),
+                                    sourceIndexInfo.getScrollSize());
+                    scrollId = scrollResult.getScrollId();
+
                     outputFromScrollResult(scrollResult, sourceIndexInfo, 
output, deserializer);
+                    while (scrollResult.getDocs() != null && 
!scrollResult.getDocs().isEmpty()) {
+                        scrollResult =
+                                esRestClient.searchWithScrollId(
+                                        scrollResult.getScrollId(),
+                                        sourceIndexInfo.getScrollTime());
+                        scrollId = scrollResult.getScrollId();
+                        outputFromScrollResult(scrollResult, sourceIndexInfo, 
output, deserializer);
+                    }
+                } finally {
+                    if (StringUtils.isNotEmpty(scrollId)) {
+                        try {
+                            esRestClient.clearScroll(scrollId);
+                        } catch (Exception e) {
+                            log.warn("Failed to clear scroll ID: " + scrollId, 
e);
+                        }
+                    }
                 }
             }
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 2479be67b0..d18aa4d588 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -1083,6 +1083,37 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         return data;
     }
 
+    @Test
+    public void testScrollAndSqlCursorResourceCleanup() throws Exception {
+
+        String scrollId = null;
+        try {
+            List<String> source = Arrays.asList("c_string", "c_int");
+            Map<String, Object> query = new HashMap<>();
+            query.put("match_all", Collections.emptyMap());
+
+            ScrollResult result = esRestClient.searchByScroll("st_index", 
source, query, "1m", 5);
+            scrollId = result.getScrollId();
+            Assertions.assertNotNull(scrollId, "Scroll ID should not be null");
+
+            int totalDocs = result.getDocs().size();
+            while (result.getDocs() != null && !result.getDocs().isEmpty()) {
+                result = esRestClient.searchWithScrollId(scrollId, "1m");
+                scrollId = result.getScrollId();
+                if (result.getDocs() != null) {
+                    totalDocs += result.getDocs().size();
+                }
+            }
+            log.info("Retrieved {} documents via Scroll API", totalDocs);
+
+        } finally {
+            if (scrollId != null) {
+                boolean cleaned = esRestClient.clearScroll(scrollId);
+                Assertions.assertTrue(cleaned, "Scroll context should be 
successfully cleaned up");
+            }
+        }
+    }
+
     /**
      * elastic query all dsl
      *

Reply via email to