Hisoka-X commented on code in PR #9150:
URL: https://github.com/apache/seatunnel/pull/9150#discussion_r2043369998
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java:
##########
@@ -109,20 +113,98 @@ private void scrollSearchResult(
scrollResult.getScrollId(),
scrollResult.getColumnNodes());
outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
}
- } else {
- ScrollResult scrollResult =
- esRestClient.searchByScroll(
- sourceIndexInfo.getIndex(),
+ }
+ // DSL query
+ else {
+ // Check if we should use PIT API
Review Comment:
```suggestion
} else {
// Check if we should use PIT API
```
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java:
##########
@@ -876,4 +878,208 @@ public void addField(String index,
BasicTypeDefine<EsType> fieldTypeDefine) {
ex);
}
}
+
+ /**
+ * Creates a Point-in-Time (PIT) for the specified index.
+ *
+ * @param index The index to create a PIT for
+ * @param keepAlive The time to keep the PIT alive (in milliseconds)
+ * @return The PIT ID
+ */
+ public String createPointInTime(String index, long keepAlive) {
+ String endpoint = String.format("/%s/_pit?keep_alive=%dms",
index.toLowerCase(), keepAlive);
+ Request request = new Request("POST", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED,
+ "POST " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode jsonNode = JsonUtils.parseObject(entity);
+ return jsonNode.get("id").asText();
+ } else {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED,
+ String.format(
+ "POST %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED, ex);
+ }
+ }
+
+ /**
+ * Deletes a Point-in-Time (PIT).
+ *
+ * @param pitId The PIT ID to delete
+ * @return True if the PIT was successfully deleted
+ */
+ public boolean deletePointInTime(String pitId) {
+ String endpoint = "/_pit";
+ Request request = new Request("DELETE", endpoint);
+ Map<String, String> requestBody = new HashMap<>();
+ requestBody.put("id", pitId);
+ request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED,
+ "DELETE " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode jsonNode = JsonUtils.parseObject(entity);
+ return jsonNode.get("succeeded").asBoolean();
+ } else {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED,
+ String.format(
+ "DELETE %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED, ex);
+ }
+ }
+
+ /**
+ * Searches using a Point-in-Time (PIT).
+ *
+ * @param pitId The PIT ID to use
+ * @param source The fields to include in the response
+ * @param query The query to execute
+ * @param batchSize The number of documents to return
+ * @param searchAfter The sort values to search after (for pagination)
+ * @param keepAlive The time to keep the PIT alive (in milliseconds)
+ * @return The search results
+ */
+ public PointInTimeResult searchWithPointInTime(
+ String pitId,
+ List<String> source,
+ Map<String, Object> query,
+ int batchSize,
+ Object[] searchAfter,
+ long keepAlive) {
+
+ Map<String, Object> requestBody = new HashMap<>();
+ requestBody.put("size", batchSize);
+ requestBody.put("query", query);
+ requestBody.put("_source", source);
+
+ // Add PIT information
+ Map<String, Object> pit = new HashMap<>();
+ pit.put("id", pitId);
+ pit.put("keep_alive", keepAlive + "ms");
+ requestBody.put("pit", pit);
+
+ // Add sort for search_after
+ List<Map<String, String>> sort = new ArrayList<>();
+ Map<String, String> sortField = new HashMap<>();
+ sortField.put("order", "asc");
+ sort.add(Collections.singletonMap("_shard_doc", "asc"));
Review Comment:
@CosmosNi
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java:
##########
@@ -63,6 +65,8 @@ public OptionRule optionRule() {
SCROLL_TIME,
SCROLL_SIZE,
QUERY,
+ PIT_KEEP_ALIVE,
+ PIT_BATCH_SIZE,
Review Comment:
```suggestion
PIT_KEEP_ALIVE,
PIT_BATCH_SIZE,
SEARCH_API_TYPE,
```
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java:
##########
@@ -109,20 +113,98 @@ private void scrollSearchResult(
scrollResult.getScrollId(),
scrollResult.getColumnNodes());
outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
}
- } else {
- ScrollResult scrollResult =
- esRestClient.searchByScroll(
- sourceIndexInfo.getIndex(),
+ }
+ // DSL query
+ else {
+ // Check if we should use PIT API
+ if
(SearchApiTypeEnum.PIT.equals(sourceIndexInfo.getSearchApiType())) {
+ log.info("Using Point-in-Time (PIT) API for index: {}",
sourceIndexInfo.getIndex());
+ searchWithPointInTime(sourceIndexInfo, output, deserializer);
+ }
+ // Default scroll API
+ else {
Review Comment:
```suggestion
} else {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]