LiJie20190102 commented on code in PR #10446:
URL: https://github.com/apache/seatunnel/pull/10446#discussion_r2764225294
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java:
##########
@@ -307,6 +307,48 @@ public boolean clearScroll(String scrollId) {
}
}
+ /**
+ * Close SQL cursor to release server-side resources.
+ *
+ * @param cursor The SQL cursor to close
+ * @return True if the cursor was successfully closed
+ */
+ public boolean closeSqlCursor(String cursor) {
+ if (StringUtils.isEmpty(cursor)) {
+ log.warn("Attempted to close SQL cursor with empty cursor");
+ return false;
+ }
+
+ String endpoint = "/_sql/close";
+ Request request = new Request("POST", endpoint);
+ Map<String, String> requestBody = new HashMap<>();
+ requestBody.put("cursor", cursor);
+ request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ log.warn("POST {} response null for cursor: {}", endpoint,
cursor);
+ return false;
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode jsonNode = JsonUtils.parseObject(entity);
+ return jsonNode.get("succeeded").asBoolean();
+ } else {
+ log.warn(
+ "POST {} response status code={} for cursor: {}",
+ endpoint,
+ response.getStatusLine().getStatusCode(),
+ cursor);
+ return false;
+ }
+ } catch (Exception ex) {
+ log.warn("Failed to close SQL cursor: " + cursor, ex);
Review Comment:
When printing logs, it is best to standardize the format and use
placeholders `{}`
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java:
##########
@@ -307,6 +307,48 @@ public boolean clearScroll(String scrollId) {
}
}
+ /**
+ * Close SQL cursor to release server-side resources.
+ *
+ * @param cursor The SQL cursor to close
+ * @return True if the cursor was successfully closed
+ */
+ public boolean closeSqlCursor(String cursor) {
+ if (StringUtils.isEmpty(cursor)) {
+ log.warn("Attempted to close SQL cursor with empty cursor");
Review Comment:
Thank you for your contribution. I don't quite understand the log printing
here. You can say that an empty cursor doesn't need to be closed, What do you
think?
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java:
##########
@@ -102,16 +102,28 @@ private void scrollSearchResult(
// SQL client
if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
log.info("Using SQL query for index: {}",
sourceIndexInfo.getIndex());
- ScrollResult scrollResult =
- esRestClient.searchBySql(
- sourceIndexInfo.getSqlQuery(),
sourceIndexInfo.getScrollSize());
-
- outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
- while (StringUtils.isNotEmpty(scrollResult.getScrollId())) {
- scrollResult =
- esRestClient.searchWithSql(
- scrollResult.getScrollId(),
scrollResult.getColumnNodes());
+ String cursor = null;
+ try {
+ ScrollResult scrollResult =
+ esRestClient.searchBySql(
+ sourceIndexInfo.getSqlQuery(),
sourceIndexInfo.getScrollSize());
+
outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
+ cursor = scrollResult.getScrollId();
+ while (StringUtils.isNotEmpty(cursor)) {
+ scrollResult =
+ esRestClient.searchWithSql(cursor,
scrollResult.getColumnNodes());
+ outputFromScrollResult(scrollResult, sourceIndexInfo,
output, deserializer);
+ cursor = scrollResult.getScrollId();
+ }
+ } finally {
+ if (StringUtils.isNotEmpty(cursor)) {
+ try {
+ esRestClient.closeSqlCursor(cursor);
+ } catch (Exception e) {
+ log.warn("Failed to close SQL cursor: " + cursor, e);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]