This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9612fe13eaf Elasticsearch enable Point In Time based searches (#30824)
9612fe13eaf is described below

commit 9612fe13eaf30eaa7b34c00026ec65895bd1a9fb
Author: pablo rodriguez defino <[email protected]>
AuthorDate: Tue Apr 30 12:30:38 2024 -0700

    Elasticsearch enable Point In Time based searches (#30824)
    
    * first implementation for a PIT iterator on read PTransform
---
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java    |   6 +
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  20 +-
 .../elasticsearch/ElasticsearchIOTestCommon.java   |  31 +-
 .../io/elasticsearch/ElasticsearchIOTestUtils.java |   7 +-
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 401 +++++++++++++++++----
 5 files changed, 391 insertions(+), 74 deletions(-)

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 6801003e139..415af5e29a5 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -98,6 +98,12 @@ public class ElasticsearchIOIT {
     elasticsearchIOTestCommon.testRead();
   }
 
+  @Test
+  public void testReadPITVolume() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadPIT();
+  }
+
   @Test
   public void testWriteVolume() throws Exception {
     // cannot share elasticsearchIOTestCommon because tests run in parallel.
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 6bf96360d53..4cf02212bdc 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -104,13 +104,31 @@ public class ElasticsearchIOTest implements Serializable {
     elasticsearchIOTestCommon.testRead();
   }
 
+  @Test
+  public void testReadPIT() throws Exception {
+    // need to create the index using the helper method (not create it at 
first insertion)
+    // for the indexSettings() to be run
+    createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadPIT();
+  }
+
   @Test
   public void testReadWithQueryString() throws Exception {
     // need to create the index using the helper method (not create it at 
first insertion)
     // for the indexSettings() to be run
     createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
     elasticsearchIOTestCommon.setPipeline(pipeline);
-    elasticsearchIOTestCommon.testRead();
+    elasticsearchIOTestCommon.testReadWithQueryString();
+  }
+
+  @Test
+  public void testReadWithQueryStringAndPIT() throws Exception {
+    // need to create the index using the helper method (not create it at 
first insertion)
+    // for the indexSettings() to be run
+    createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadWithQueryAndPIT();
   }
 
   @Test
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index d2e8efe4899..d1eeb610b65 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -235,17 +235,36 @@ class ElasticsearchIOTestCommon implements Serializable {
     pipeline.run();
   }
 
+  /** Point in Time search is currently available for Elasticsearch version 
8+. */
+  void testReadPIT() throws Exception {
+    if (!useAsITests) {
+      ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, 
numDocs, restClient);
+    }
+
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withPointInTimeSearch());
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(numDocs);
+    pipeline.run();
+  }
+
   void testReadWithQueryString() throws Exception {
-    testReadWithQueryInternal(Read::withQuery);
+    testReadWithQueryInternal(Read::withQuery, true);
+  }
+
+  void testReadWithQueryAndPIT() throws Exception {
+    testReadWithQueryInternal(Read::withQuery, false);
   }
 
   void testReadWithQueryValueProvider() throws Exception {
     testReadWithQueryInternal(
-        (read, query) -> 
read.withQuery(ValueProvider.StaticValueProvider.of(query)));
+        (read, query) -> 
read.withQuery(ValueProvider.StaticValueProvider.of(query)), true);
   }
 
-  private void testReadWithQueryInternal(BiFunction<Read, String, Read> 
queryConfigurer)
-      throws IOException {
+  private void testReadWithQueryInternal(
+      BiFunction<Read, String, Read> queryConfigurer, boolean useScrollAPI) 
throws IOException {
     if (!useAsITests) {
       ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, 
numDocs, restClient);
     }
@@ -257,7 +276,7 @@ class ElasticsearchIOTestCommon implements Serializable {
             + "    \"scientist\" : {\n"
             + "      \"query\" : \"Einstein\"\n"
             + "    }\n"
-            + "  }\n"
+            + "   }\n"
             + "  }\n"
             + "}";
 
@@ -265,6 +284,8 @@ class ElasticsearchIOTestCommon implements Serializable {
 
     read = queryConfigurer.apply(read, query);
 
+    read = useScrollAPI ? read : read.withPointInTimeSearch();
+
     PCollection<String> output = pipeline.apply(read);
 
     PAssert.thatSingleton(output.apply("Count", Count.globally()))
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
index 4416f6c7ec3..49f738b3e7d 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
 import java.time.Duration;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -334,6 +335,7 @@ class ElasticsearchIOTestUtils {
   static List<String> createDocuments(long numDocs, InjectionMode 
injectionMode) {
 
     ArrayList<String> data = new ArrayList<>();
+    LocalDateTime baseDateTime = LocalDateTime.now();
     for (int i = 0; i < numDocs; i++) {
       int index = i % FAMOUS_SCIENTISTS.length;
       // insert 2 malformed documents
@@ -341,7 +343,10 @@ class ElasticsearchIOTestUtils {
           && INVALID_DOCS_IDS.contains(i)) {
         data.add(String.format("{\"scientist\";\"%s\", \"id\":%s}", 
FAMOUS_SCIENTISTS[index], i));
       } else {
-        data.add(String.format("{\"scientist\":\"%s\", \"id\":%s}", 
FAMOUS_SCIENTISTS[index], i));
+        data.add(
+            String.format(
+                "{\"scientist\":\"%s\", \"id\":%s, \"@timestamp\" : \"%s\"}",
+                FAMOUS_SCIENTISTS[index], i, 
baseDateTime.plusSeconds(i).toString()));
       }
     }
     return data;
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index a57f45d7c90..dc0361056b3 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -61,6 +61,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -220,6 +222,7 @@ public class ElasticsearchIO {
         .setWithMetadata(false)
         .setScrollKeepalive("5m")
         .setBatchSize(100L)
+        .setUsePITSearch(false)
         .build();
   }
 
@@ -752,6 +755,16 @@ public class ElasticsearchIO {
   public abstract static class Read extends PTransform<PBegin, 
PCollection<String>> {
 
     private static final long MAX_BATCH_SIZE = 10000L;
+    private static final String SEARCH_AFTER_DEFAULT_SORT_PROPERTY = 
"@timestamp";
+    private static final String SEARCH_AFTER_SORT_TEMPLATE =
+        "\"sort\" : {"
+            + " \"%s\" : {"
+            + "  \"order\" : \"asc\", "
+            + "  \"format\" : \"strict_date_optional_time_nanos\""
+            + " }"
+            + "}";
+    private static final String SEARCH_AFTER_DEFAULT_SORT =
+        String.format(SEARCH_AFTER_SORT_TEMPLATE, 
SEARCH_AFTER_DEFAULT_SORT_PROPERTY);
 
     abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
 
@@ -763,6 +776,12 @@ public class ElasticsearchIO {
 
     abstract long getBatchSize();
 
+    abstract boolean getUsePITSearch();
+
+    abstract @Nullable String getPITSortConfig();
+
+    abstract @Nullable String getPITSortTimestampProperty();
+
     abstract Builder builder();
 
     @AutoValue.Builder
@@ -777,6 +796,12 @@ public class ElasticsearchIO {
 
       abstract Builder setBatchSize(long batchSize);
 
+      abstract Builder setUsePITSearch(boolean usePIT);
+
+      abstract Builder setPITSortConfig(String pitConfig);
+
+      abstract Builder setPITSortTimestampProperty(String 
pitTimestampProperty);
+
       abstract Read build();
     }
 
@@ -832,7 +857,9 @@ public class ElasticsearchIO {
     /**
      * Provide a scroll keepalive. See <a
      * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-request-scroll.html";>scroll
-     * API</a> Default is "5m". Change this only if you get "No search context 
found" errors.
+     * API</a> Default is "5m". Change this only if you get "No search context 
found" errors. When
+     * configuring the read to use Point In Time (PIT) search this 
configuration is used to set the
+     * PIT keep alive.
      *
      * @param scrollKeepalive keepalive duration of the scroll
      * @return a {@link PTransform} reading data from Elasticsearch.
@@ -862,6 +889,58 @@ public class ElasticsearchIO {
       return builder().setBatchSize(batchSize).build();
     }
 
+    /**
+     * Configures the source to user Point In Time search iteration while 
reading data from
+     * Elasticsearch. See <a
+     * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html";>
+     * Point in time search</a>, using default settings. This iteration mode 
for searches does not
+     * have the same size constrains the Scroll API have (slice counts, batch 
size or how deep the
+     * iteration is). By default this iteration mode will use a {@code 
@timestamp} named property on
+     * the indexed documents to consistently retrieve the data when failures 
occur on an specific
+     * read work.
+     *
+     * @return a {@link PTransform} reading data from Elasticsearch.
+     */
+    public Read withPointInTimeSearch() {
+      return builder()
+          .setUsePITSearch(true)
+          .setBatchSize(1000)
+          .setPITSortConfig(SEARCH_AFTER_DEFAULT_SORT)
+          .build();
+    }
+
+    /**
+     * Similar to {@link #withPointInTimeSearch() the default PIT search} but 
setting an existing
+     * timestamp based property name which Elasticsearch will use to sort for 
the results.
+     *
+     * @param timestampSortProperty a property name found in the read 
documents containing a
+     *     timestamp-like value.
+     * @return a {@link PTransform} reading data from Elasticsearch.
+     */
+    public Read withPointInTimeSearchAndTimestampSortProperty(String 
timestampSortProperty) {
+      return builder()
+          .setUsePITSearch(true)
+          .setBatchSize(1000)
+          .setPITSortConfig(String.format(SEARCH_AFTER_SORT_TEMPLATE, 
timestampSortProperty))
+          .build();
+    }
+
+    /**
+     * Similar to {@link #withPointInTimeSearch() the default PIT search} but 
setting a specific
+     * sorting configuration which Elasticsearch will use to sort for the 
results.
+     *
+     * @param sortConfiguration the full sorting configuration to be sent to 
Elasticsearch while
+     *     iterating on the results.
+     * @return a {@link PTransform} reading data from Elasticsearch.
+     */
+    public Read withPointInTimeSearchAndSortConfiguration(String 
sortConfiguration) {
+      return builder()
+          .setUsePITSearch(true)
+          .setBatchSize(1000)
+          .setPITSortConfig(sortConfiguration)
+          .build();
+    }
+
     @Override
     public PCollection<String> expand(PBegin input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
@@ -877,8 +956,15 @@ public class ElasticsearchIO {
       builder.addIfNotNull(DisplayData.item("withMetadata", isWithMetadata()));
       builder.addIfNotNull(DisplayData.item("batchSize", getBatchSize()));
       builder.addIfNotNull(DisplayData.item("scrollKeepalive", 
getScrollKeepalive()));
+      builder.addIfNotNull(DisplayData.item("usePointInTimeSearch", 
getUsePITSearch()));
       getConnectionConfiguration().populateDisplayData(builder);
     }
+
+    void validatePITConfiguration(int backendVersion) {
+      checkArgument(
+          getUsePITSearch() && backendVersion >= 8,
+          "Point in time searches are supported for clusters with version 8 
and higher.");
+    }
   }
 
   /** A {@link BoundedSource} reading from Elasticsearch. */
@@ -1014,7 +1100,12 @@ public class ElasticsearchIO {
 
     @Override
     public BoundedReader<String> createReader(PipelineOptions options) {
-      return new BoundedElasticsearchReader(this);
+      if (!spec.getUsePITSearch()) {
+        return new BoundedElasticsearchScrollReader(this);
+      } else {
+        spec.validatePITConfiguration(backendVersion);
+        return new BoundedElasticsearchPITReader(this);
+      }
     }
 
     @Override
@@ -1039,48 +1130,47 @@ public class ElasticsearchIO {
     }
   }
 
-  private static class BoundedElasticsearchReader extends 
BoundedSource.BoundedReader<String> {
+  abstract static class BoundedElasticsearchReader extends 
BoundedSource.BoundedReader<String> {
+    private static final Counter READ =
+        Metrics.counter(BoundedElasticsearchScrollReader.class, 
"es-read-document-count");
+    private static final String MATCH_ALL_QUERY = "{\"query\": { 
\"match_all\": {} }}";
 
-    private final BoundedElasticsearchSource source;
+    protected final BoundedElasticsearchSource source;
 
-    private RestClient restClient;
-    private String current;
-    private String scrollId;
-    private ListIterator<String> batchIterator;
+    protected RestClient restClient;
+    protected JsonNode current;
+    protected ListIterator<JsonNode> batchIterator;
+    protected String iteratorId;
 
-    private BoundedElasticsearchReader(BoundedElasticsearchSource source) {
+    protected BoundedElasticsearchReader(BoundedElasticsearchSource source) {
       this.source = source;
     }
 
-    @Override
-    public boolean start() throws IOException {
-      restClient = source.spec.getConnectionConfiguration().createClient();
+    protected abstract Request createStartRequest();
+
+    protected abstract Request createAdvanceRequest();
+
+    protected abstract Request createCloseRequest();
 
+    protected abstract boolean processResult(JsonNode searchResult) throws 
IOException;
+
+    protected abstract void updateIteratorId(JsonNode searchResult);
+
+    protected String createBaseQuery() {
       String query = source.spec.getQuery() != null ? 
source.spec.getQuery().get() : null;
       if (query == null) {
-        query = "{\"query\": { \"match_all\": {} }}";
-      }
-      if ((source.backendVersion >= 5) && source.numSlices != null && 
source.numSlices > 1) {
-        // if there is more than one slice, add the slice to the user query
-        String sliceQuery =
-            String.format("\"slice\": {\"id\": %s,\"max\": %s}", 
source.sliceId, source.numSlices);
-        query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
+        query = BoundedElasticsearchReader.MATCH_ALL_QUERY;
       }
-      String endPoint = 
source.spec.getConnectionConfiguration().getSearchEndPoint();
-      Map<String, String> params = new HashMap<>();
-      params.put("scroll", source.spec.getScrollKeepalive());
-      HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
-      Request request = new Request("GET", endPoint);
-      request.addParameters(params);
-      request.setEntity(queryEntity);
-      Response response = restClient.performRequest(request);
-      JsonNode searchResult = parseResponse(response.getEntity());
-      updateScrollId(searchResult);
-      return readNextBatchAndReturnFirstDocument(searchResult);
+      return query;
     }
 
-    private void updateScrollId(JsonNode searchResult) {
-      scrollId = searchResult.path("_scroll_id").asText();
+    @Override
+    public boolean start() throws IOException {
+      restClient = source.spec.getConnectionConfiguration().createClient();
+      Response response = restClient.performRequest(createStartRequest());
+      JsonNode searchResult = parseResponse(response.getEntity());
+      updateIteratorId(searchResult);
+      return processResult(searchResult);
     }
 
     @Override
@@ -1089,22 +1179,18 @@ public class ElasticsearchIO {
         current = batchIterator.next();
         return true;
       } else {
-        String requestBody =
-            String.format(
-                "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
-                source.spec.getScrollKeepalive(), scrollId);
-        HttpEntity scrollEntity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
-        Request request = new Request("GET", "/_search/scroll");
-        request.addParameters(Collections.emptyMap());
-        request.setEntity(scrollEntity);
-        Response response = restClient.performRequest(request);
-        JsonNode searchResult = parseResponse(response.getEntity());
-        updateScrollId(searchResult);
-        return readNextBatchAndReturnFirstDocument(searchResult);
-      }
-    }
-
-    private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) 
{
+        return performAdvance();
+      }
+    }
+
+    protected boolean performAdvance() throws IOException {
+      Response response = restClient.performRequest(createAdvanceRequest());
+      JsonNode searchResult = parseResponse(response.getEntity());
+      updateIteratorId(searchResult);
+      return processResult(searchResult);
+    }
+
+    protected boolean readNextBatchAndReturnFirstDocument(JsonNode 
searchResult) {
       // stop if no more data
       JsonNode hits = searchResult.path("hits").path("hits");
       if (hits.size() == 0) {
@@ -1113,15 +1199,9 @@ public class ElasticsearchIO {
         return false;
       }
       // list behind iterator is empty
-      List<String> batch = new ArrayList<>();
-      boolean withMetadata = source.spec.isWithMetadata();
+      List<JsonNode> batch = new ArrayList<>();
       for (JsonNode hit : hits) {
-        if (withMetadata) {
-          batch.add(hit.toString());
-        } else {
-          String document = hit.path("_source").toString();
-          batch.add(document);
-        }
+        batch.add(hit);
       }
       batchIterator = batch.listIterator();
       current = batchIterator.next();
@@ -1133,19 +1213,17 @@ public class ElasticsearchIO {
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return current;
+      READ.inc();
+      boolean withMetadata = source.spec.isWithMetadata();
+      return withMetadata ? current.toString() : 
current.path("_source").toString();
     }
 
     @Override
     public void close() throws IOException {
-      // remove the scroll
-      String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", 
scrollId);
-      HttpEntity entity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
+      Request closeRequest = createCloseRequest();
+      // clear the selected iterator
       try {
-        Request request = new Request("DELETE", "/_search/scroll");
-        request.addParameters(Collections.emptyMap());
-        request.setEntity(entity);
-        restClient.performRequest(request);
+        restClient.performRequest(closeRequest);
       } finally {
         if (restClient != null) {
           restClient.close();
@@ -1158,6 +1236,195 @@ public class ElasticsearchIO {
       return source;
     }
   }
+
+  static class BoundedElasticsearchScrollReader extends 
BoundedElasticsearchReader {
+
+    public BoundedElasticsearchScrollReader(BoundedElasticsearchSource source) 
{
+      super(source);
+    }
+
+    @Override
+    protected Request createStartRequest() {
+      String query = createBaseQuery();
+      if ((source.backendVersion >= 5) && source.numSlices != null && 
source.numSlices > 1) {
+        // if there is more than one slice, add the slice to the user query
+        String sliceQuery =
+            String.format("\"slice\": {\"id\": %s,\"max\": %s}", 
source.sliceId, source.numSlices);
+        query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
+      }
+      String endPoint = 
source.spec.getConnectionConfiguration().getSearchEndPoint();
+      Map<String, String> params = new HashMap<>();
+      params.put("scroll", source.spec.getScrollKeepalive());
+      HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
+      Request request = new Request("GET", endPoint);
+      request.addParameters(params);
+      request.setEntity(queryEntity);
+      return request;
+    }
+
+    @Override
+    protected Request createAdvanceRequest() {
+      String requestBody =
+          String.format(
+              "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
+              source.spec.getScrollKeepalive(), iteratorId);
+      HttpEntity scrollEntity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
+      Request request = new Request("GET", "/_search/scroll");
+      request.addParameters(Collections.emptyMap());
+      request.setEntity(scrollEntity);
+      return request;
+    }
+
+    @Override
+    protected Request createCloseRequest() {
+      String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", 
iteratorId);
+      HttpEntity entity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
+      Request request = new Request("DELETE", "/_search/scroll");
+      request.addParameters(Collections.emptyMap());
+      request.setEntity(entity);
+      return request;
+    }
+
+    @Override
+    protected boolean processResult(JsonNode searchResult) throws IOException {
+      return readNextBatchAndReturnFirstDocument(searchResult);
+    }
+
+    @Override
+    protected void updateIteratorId(JsonNode searchResult) {
+      iteratorId = searchResult.path("_scroll_id").asText();
+    }
+  }
+
+  static class BoundedElasticsearchPITReader extends 
BoundedElasticsearchReader {
+
+    private String searchAfterProperty = "";
+
+    public BoundedElasticsearchPITReader(BoundedElasticsearchSource source) {
+      super(source);
+    }
+
+    private String modifyQueryForPIT(String originalQuery) {
+      String trimmed = originalQuery.trim();
+      if (trimmed.startsWith("{") && trimmed.endsWith("}")) {
+        return trimmed.substring(1, trimmed.length() - 1);
+      }
+      return originalQuery;
+    }
+
+    @Override
+    protected String createBaseQuery() {
+      return modifyQueryForPIT(super.createBaseQuery()) + ", " + 
source.spec.getPITSortConfig();
+    }
+
+    @Override
+    protected Request createStartRequest() {
+      String endPoint =
+          String.format("/%s/_pit", 
source.spec.getConnectionConfiguration().getIndex());
+      Map<String, String> params = new HashMap<>();
+      params.put("keep_alive", source.spec.getScrollKeepalive());
+      Request request = new Request("POST", endPoint);
+      request.addParameters(params);
+      return request;
+    }
+
+    String searchAfter() {
+      if (searchAfterProperty.isEmpty()) {
+        return "";
+      }
+      return String.format("\"search_after\" : %s,", searchAfterProperty);
+    }
+
+    @Override
+    protected Request createAdvanceRequest() {
+      // if there is more than one slice, add the slice to the user query
+      String sliceQuery =
+          source.numSlices > 1
+              ? String.format(
+                  "\"slice\" : {\"id\" : %s, \"max\" : %s},", source.sliceId, 
source.numSlices)
+              : "";
+
+      String requestBody =
+          String.format(
+              "{"
+                  + " %s"
+                  + " \"size\" : %d,"
+                  + " %s"
+                  + " %s,"
+                  + " \"pit\": {"
+                  + "  \"id\": \"%s\""
+                  + " }"
+                  + "}",
+              searchAfter(), source.spec.getBatchSize(), sliceQuery, 
createBaseQuery(), iteratorId);
+      HttpEntity pitSearchEntity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
+      Request request = new Request("POST", "/_search");
+      request.addParameters(Collections.emptyMap());
+      request.setEntity(pitSearchEntity);
+      return request;
+    }
+
+    @Override
+    protected Request createCloseRequest() {
+      String requestBody = String.format("{\"id\" : \"%s\"}", iteratorId);
+      HttpEntity entity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
+      Request request = new Request("DELETE", "/_pit");
+      request.addParameters(Collections.emptyMap());
+      request.setEntity(entity);
+      return request;
+    }
+
+    String extractSearchAfterFromDocument(JsonNode document) {
+      return document.path("sort").toString();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (batchIterator.hasNext()) {
+        current = batchIterator.next();
+        searchAfterProperty = extractSearchAfterFromDocument(current);
+        return true;
+      } else {
+        return performAdvance();
+      }
+    }
+
+    @Override
+    protected boolean processResult(JsonNode searchResult) throws IOException {
+      JsonNode hits = searchResult.path("hits");
+      if (hits == null || hits.isMissingNode()) {
+        // after creating the PIT we need to make the first request to comply 
with Reader API and
+        // try to get a first result or declare the source empty
+        return performAdvance();
+      }
+      JsonNode resultArray = hits.path("hits");
+      // check if results are empty
+      if (resultArray == null || resultArray.isEmpty()) {
+        return false;
+      }
+      // we already opened the PIT search and are processing the search results
+      boolean wasDocumentRead = 
readNextBatchAndReturnFirstDocument(searchResult);
+      if (wasDocumentRead && current != null) {
+        searchAfterProperty = extractSearchAfterFromDocument(current);
+      }
+      return wasDocumentRead;
+    }
+
+    @Override
+    protected void updateIteratorId(JsonNode searchResult) {
+      iteratorId = extractPITId(searchResult);
+    }
+
+    String extractPITId(JsonNode searchResult) {
+      String maybeId = searchResult.path("id").asText();
+      // check if this is the first request
+      if (maybeId != null && !maybeId.isEmpty()) {
+        return maybeId;
+      } else {
+        return searchResult.path("pit_id").asText();
+      }
+    }
+  }
+
   /**
    * A POJO encapsulating a configuration for retry behavior when issuing 
requests to ES. A retry
    * will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes first, for
@@ -1423,8 +1690,8 @@ public class ElasticsearchIO {
      * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html";>data
      * stream</a>. Data streams only support the {@code create} operation. For 
more information see
      * the <a
-     * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc>
-     * Elasticsearch documentation</a>
+     * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc";>Elasticsearch
+     * documentation</a>
      *
      * <p>Updates and deletions are not allowed, so related options will be 
ignored.
      *
@@ -1485,7 +1752,7 @@ public class ElasticsearchIO {
      * the batch will fail and the exception propagated. Incompatible with 
update operations and
      * should only be used with withUsePartialUpdate(false)
      *
-     * @param docVersionType the version type to use, one of {@value 
VERSION_TYPES}
+     * @param docVersionType the version type to use, one of {@link 
VERSION_TYPES}
      * @return the {@link DocToBulk} with the doc version type set
      */
     public DocToBulk withDocVersionType(String docVersionType) {

Reply via email to