This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9612fe13eaf Elasticsearch enable Point In Time based searches (#30824)
9612fe13eaf is described below
commit 9612fe13eaf30eaa7b34c00026ec65895bd1a9fb
Author: pablo rodriguez defino <[email protected]>
AuthorDate: Tue Apr 30 12:30:38 2024 -0700
Elasticsearch enable Point In Time based searches (#30824)
* first implementation for a PIT iterator on read PTransform
---
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 6 +
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 20 +-
.../elasticsearch/ElasticsearchIOTestCommon.java | 31 +-
.../io/elasticsearch/ElasticsearchIOTestUtils.java | 7 +-
.../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 401 +++++++++++++++++----
5 files changed, 391 insertions(+), 74 deletions(-)
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 6801003e139..415af5e29a5 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -98,6 +98,12 @@ public class ElasticsearchIOIT {
elasticsearchIOTestCommon.testRead();
}
+ @Test
+ public void testReadPITVolume() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadPIT();
+ }
+
@Test
public void testWriteVolume() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 6bf96360d53..4cf02212bdc 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -104,13 +104,31 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.testRead();
}
+ @Test
+ public void testReadPIT() throws Exception {
+ // need to create the index using the helper method (not create it at
first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadPIT();
+ }
+
@Test
public void testReadWithQueryString() throws Exception {
// need to create the index using the helper method (not create it at
first insertion)
// for the indexSettings() to be run
createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
elasticsearchIOTestCommon.setPipeline(pipeline);
- elasticsearchIOTestCommon.testRead();
+ elasticsearchIOTestCommon.testReadWithQueryString();
+ }
+
+ @Test
+ public void testReadWithQueryStringAndPIT() throws Exception {
+ // need to create the index using the helper method (not create it at
first insertion)
+ // for the indexSettings() to be run
+ createIndex(elasticsearchIOTestCommon.restClient, getEsIndex());
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testReadWithQueryAndPIT();
}
@Test
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index d2e8efe4899..d1eeb610b65 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -235,17 +235,36 @@ class ElasticsearchIOTestCommon implements Serializable {
pipeline.run();
}
+ /** Point in Time search is currently available for Elasticsearch version
8+. */
+ void testReadPIT() throws Exception {
+ if (!useAsITests) {
+ ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration,
numDocs, restClient);
+ }
+
+ PCollection<String> output =
+ pipeline.apply(
+ ElasticsearchIO.read()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withPointInTimeSearch());
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(numDocs);
+ pipeline.run();
+ }
+
void testReadWithQueryString() throws Exception {
- testReadWithQueryInternal(Read::withQuery);
+ testReadWithQueryInternal(Read::withQuery, true);
+ }
+
+ void testReadWithQueryAndPIT() throws Exception {
+ testReadWithQueryInternal(Read::withQuery, false);
}
void testReadWithQueryValueProvider() throws Exception {
testReadWithQueryInternal(
- (read, query) ->
read.withQuery(ValueProvider.StaticValueProvider.of(query)));
+ (read, query) ->
read.withQuery(ValueProvider.StaticValueProvider.of(query)), true);
}
- private void testReadWithQueryInternal(BiFunction<Read, String, Read>
queryConfigurer)
- throws IOException {
+ private void testReadWithQueryInternal(
+ BiFunction<Read, String, Read> queryConfigurer, boolean useScrollAPI)
throws IOException {
if (!useAsITests) {
ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration,
numDocs, restClient);
}
@@ -257,7 +276,7 @@ class ElasticsearchIOTestCommon implements Serializable {
+ " \"scientist\" : {\n"
+ " \"query\" : \"Einstein\"\n"
+ " }\n"
- + " }\n"
+ + " }\n"
+ " }\n"
+ "}";
@@ -265,6 +284,8 @@ class ElasticsearchIOTestCommon implements Serializable {
read = queryConfigurer.apply(read, query);
+ read = useScrollAPI ? read : read.withPointInTimeSearch();
+
PCollection<String> output = pipeline.apply(read);
PAssert.thatSingleton(output.apply("Count", Count.globally()))
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
index 4416f6c7ec3..49f738b3e7d 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.time.Duration;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -334,6 +335,7 @@ class ElasticsearchIOTestUtils {
static List<String> createDocuments(long numDocs, InjectionMode
injectionMode) {
ArrayList<String> data = new ArrayList<>();
+ LocalDateTime baseDateTime = LocalDateTime.now();
for (int i = 0; i < numDocs; i++) {
int index = i % FAMOUS_SCIENTISTS.length;
// insert 2 malformed documents
@@ -341,7 +343,10 @@ class ElasticsearchIOTestUtils {
&& INVALID_DOCS_IDS.contains(i)) {
data.add(String.format("{\"scientist\";\"%s\", \"id\":%s}",
FAMOUS_SCIENTISTS[index], i));
} else {
- data.add(String.format("{\"scientist\":\"%s\", \"id\":%s}",
FAMOUS_SCIENTISTS[index], i));
+ data.add(
+ String.format(
+ "{\"scientist\":\"%s\", \"id\":%s, \"@timestamp\" : \"%s\"}",
+ FAMOUS_SCIENTISTS[index], i,
baseDateTime.plusSeconds(i).toString()));
}
}
return data;
diff --git
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index a57f45d7c90..dc0361056b3 100644
---
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -61,6 +61,8 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
@@ -220,6 +222,7 @@ public class ElasticsearchIO {
.setWithMetadata(false)
.setScrollKeepalive("5m")
.setBatchSize(100L)
+ .setUsePITSearch(false)
.build();
}
@@ -752,6 +755,16 @@ public class ElasticsearchIO {
public abstract static class Read extends PTransform<PBegin,
PCollection<String>> {
private static final long MAX_BATCH_SIZE = 10000L;
+ private static final String SEARCH_AFTER_DEFAULT_SORT_PROPERTY =
"@timestamp";
+ private static final String SEARCH_AFTER_SORT_TEMPLATE =
+ "\"sort\" : {"
+ + " \"%s\" : {"
+ + " \"order\" : \"asc\", "
+ + " \"format\" : \"strict_date_optional_time_nanos\""
+ + " }"
+ + "}";
+ private static final String SEARCH_AFTER_DEFAULT_SORT =
+ String.format(SEARCH_AFTER_SORT_TEMPLATE,
SEARCH_AFTER_DEFAULT_SORT_PROPERTY);
abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
@@ -763,6 +776,12 @@ public class ElasticsearchIO {
abstract long getBatchSize();
+ abstract boolean getUsePITSearch();
+
+ abstract @Nullable String getPITSortConfig();
+
+ abstract @Nullable String getPITSortTimestampProperty();
+
abstract Builder builder();
@AutoValue.Builder
@@ -777,6 +796,12 @@ public class ElasticsearchIO {
abstract Builder setBatchSize(long batchSize);
+ abstract Builder setUsePITSearch(boolean usePIT);
+
+ abstract Builder setPITSortConfig(String pitConfig);
+
+ abstract Builder setPITSortTimestampProperty(String
pitTimestampProperty);
+
abstract Read build();
}
@@ -832,7 +857,9 @@ public class ElasticsearchIO {
/**
* Provide a scroll keepalive. See <a
*
href="https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-request-scroll.html">scroll
- * API</a> Default is "5m". Change this only if you get "No search context
found" errors.
+ * API</a> Default is "5m". Change this only if you get "No search context
found" errors. When
+ * configuring the read to use Point In Time (PIT) search this
configuration is used to set the
+ * PIT keep alive.
*
* @param scrollKeepalive keepalive duration of the scroll
* @return a {@link PTransform} reading data from Elasticsearch.
@@ -862,6 +889,58 @@ public class ElasticsearchIO {
return builder().setBatchSize(batchSize).build();
}
+ /**
+ * Configures the source to user Point In Time search iteration while
reading data from
+ * Elasticsearch. See <a
+ *
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html">
+ * Point in time search</a>, using default settings. This iteration mode
for searches does not
+ * have the same size constrains the Scroll API have (slice counts, batch
size or how deep the
+ * iteration is). By default this iteration mode will use a {@code
@timestamp} named property on
+ * the indexed documents to consistently retrieve the data when failures
occur on an specific
+ * read work.
+ *
+ * @return a {@link PTransform} reading data from Elasticsearch.
+ */
+ public Read withPointInTimeSearch() {
+ return builder()
+ .setUsePITSearch(true)
+ .setBatchSize(1000)
+ .setPITSortConfig(SEARCH_AFTER_DEFAULT_SORT)
+ .build();
+ }
+
+ /**
+ * Similar to {@link #withPointInTimeSearch() the default PIT search} but
setting an existing
+ * timestamp based property name which Elasticsearch will use to sort for
the results.
+ *
+ * @param timestampSortProperty a property name found in the read
documents containing a
+ * timestamp-like value.
+ * @return a {@link PTransform} reading data from Elasticsearch.
+ */
+ public Read withPointInTimeSearchAndTimestampSortProperty(String
timestampSortProperty) {
+ return builder()
+ .setUsePITSearch(true)
+ .setBatchSize(1000)
+ .setPITSortConfig(String.format(SEARCH_AFTER_SORT_TEMPLATE,
timestampSortProperty))
+ .build();
+ }
+
+ /**
+ * Similar to {@link #withPointInTimeSearch() the default PIT search} but
setting a specific
+ * sorting configuration which Elasticsearch will use to sort for the
results.
+ *
+ * @param sortConfiguration the full sorting configuration to be sent to
Elasticsearch while
+ * iterating on the results.
+ * @return a {@link PTransform} reading data from Elasticsearch.
+ */
+ public Read withPointInTimeSearchAndSortConfiguration(String
sortConfiguration) {
+ return builder()
+ .setUsePITSearch(true)
+ .setBatchSize(1000)
+ .setPITSortConfig(sortConfiguration)
+ .build();
+ }
+
@Override
public PCollection<String> expand(PBegin input) {
ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
@@ -877,8 +956,15 @@ public class ElasticsearchIO {
builder.addIfNotNull(DisplayData.item("withMetadata", isWithMetadata()));
builder.addIfNotNull(DisplayData.item("batchSize", getBatchSize()));
builder.addIfNotNull(DisplayData.item("scrollKeepalive",
getScrollKeepalive()));
+ builder.addIfNotNull(DisplayData.item("usePointInTimeSearch",
getUsePITSearch()));
getConnectionConfiguration().populateDisplayData(builder);
}
+
+ void validatePITConfiguration(int backendVersion) {
+ checkArgument(
+ getUsePITSearch() && backendVersion >= 8,
+ "Point in time searches are supported for clusters with version 8
and higher.");
+ }
}
/** A {@link BoundedSource} reading from Elasticsearch. */
@@ -1014,7 +1100,12 @@ public class ElasticsearchIO {
@Override
public BoundedReader<String> createReader(PipelineOptions options) {
- return new BoundedElasticsearchReader(this);
+ if (!spec.getUsePITSearch()) {
+ return new BoundedElasticsearchScrollReader(this);
+ } else {
+ spec.validatePITConfiguration(backendVersion);
+ return new BoundedElasticsearchPITReader(this);
+ }
}
@Override
@@ -1039,48 +1130,47 @@ public class ElasticsearchIO {
}
}
- private static class BoundedElasticsearchReader extends
BoundedSource.BoundedReader<String> {
+ abstract static class BoundedElasticsearchReader extends
BoundedSource.BoundedReader<String> {
+ private static final Counter READ =
+ Metrics.counter(BoundedElasticsearchScrollReader.class,
"es-read-document-count");
+ private static final String MATCH_ALL_QUERY = "{\"query\": {
\"match_all\": {} }}";
- private final BoundedElasticsearchSource source;
+ protected final BoundedElasticsearchSource source;
- private RestClient restClient;
- private String current;
- private String scrollId;
- private ListIterator<String> batchIterator;
+ protected RestClient restClient;
+ protected JsonNode current;
+ protected ListIterator<JsonNode> batchIterator;
+ protected String iteratorId;
- private BoundedElasticsearchReader(BoundedElasticsearchSource source) {
+ protected BoundedElasticsearchReader(BoundedElasticsearchSource source) {
this.source = source;
}
- @Override
- public boolean start() throws IOException {
- restClient = source.spec.getConnectionConfiguration().createClient();
+ protected abstract Request createStartRequest();
+
+ protected abstract Request createAdvanceRequest();
+
+ protected abstract Request createCloseRequest();
+ protected abstract boolean processResult(JsonNode searchResult) throws
IOException;
+
+ protected abstract void updateIteratorId(JsonNode searchResult);
+
+ protected String createBaseQuery() {
String query = source.spec.getQuery() != null ?
source.spec.getQuery().get() : null;
if (query == null) {
- query = "{\"query\": { \"match_all\": {} }}";
- }
- if ((source.backendVersion >= 5) && source.numSlices != null &&
source.numSlices > 1) {
- // if there is more than one slice, add the slice to the user query
- String sliceQuery =
- String.format("\"slice\": {\"id\": %s,\"max\": %s}",
source.sliceId, source.numSlices);
- query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
+ query = BoundedElasticsearchReader.MATCH_ALL_QUERY;
}
- String endPoint =
source.spec.getConnectionConfiguration().getSearchEndPoint();
- Map<String, String> params = new HashMap<>();
- params.put("scroll", source.spec.getScrollKeepalive());
- HttpEntity queryEntity = new NStringEntity(query,
ContentType.APPLICATION_JSON);
- Request request = new Request("GET", endPoint);
- request.addParameters(params);
- request.setEntity(queryEntity);
- Response response = restClient.performRequest(request);
- JsonNode searchResult = parseResponse(response.getEntity());
- updateScrollId(searchResult);
- return readNextBatchAndReturnFirstDocument(searchResult);
+ return query;
}
- private void updateScrollId(JsonNode searchResult) {
- scrollId = searchResult.path("_scroll_id").asText();
+ @Override
+ public boolean start() throws IOException {
+ restClient = source.spec.getConnectionConfiguration().createClient();
+ Response response = restClient.performRequest(createStartRequest());
+ JsonNode searchResult = parseResponse(response.getEntity());
+ updateIteratorId(searchResult);
+ return processResult(searchResult);
}
@Override
@@ -1089,22 +1179,18 @@ public class ElasticsearchIO {
current = batchIterator.next();
return true;
} else {
- String requestBody =
- String.format(
- "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
- source.spec.getScrollKeepalive(), scrollId);
- HttpEntity scrollEntity = new NStringEntity(requestBody,
ContentType.APPLICATION_JSON);
- Request request = new Request("GET", "/_search/scroll");
- request.addParameters(Collections.emptyMap());
- request.setEntity(scrollEntity);
- Response response = restClient.performRequest(request);
- JsonNode searchResult = parseResponse(response.getEntity());
- updateScrollId(searchResult);
- return readNextBatchAndReturnFirstDocument(searchResult);
- }
- }
-
- private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult)
{
+ return performAdvance();
+ }
+ }
+
+ protected boolean performAdvance() throws IOException {
+ Response response = restClient.performRequest(createAdvanceRequest());
+ JsonNode searchResult = parseResponse(response.getEntity());
+ updateIteratorId(searchResult);
+ return processResult(searchResult);
+ }
+
+ protected boolean readNextBatchAndReturnFirstDocument(JsonNode
searchResult) {
// stop if no more data
JsonNode hits = searchResult.path("hits").path("hits");
if (hits.size() == 0) {
@@ -1113,15 +1199,9 @@ public class ElasticsearchIO {
return false;
}
// list behind iterator is empty
- List<String> batch = new ArrayList<>();
- boolean withMetadata = source.spec.isWithMetadata();
+ List<JsonNode> batch = new ArrayList<>();
for (JsonNode hit : hits) {
- if (withMetadata) {
- batch.add(hit.toString());
- } else {
- String document = hit.path("_source").toString();
- batch.add(document);
- }
+ batch.add(hit);
}
batchIterator = batch.listIterator();
current = batchIterator.next();
@@ -1133,19 +1213,17 @@ public class ElasticsearchIO {
if (current == null) {
throw new NoSuchElementException();
}
- return current;
+ READ.inc();
+ boolean withMetadata = source.spec.isWithMetadata();
+ return withMetadata ? current.toString() :
current.path("_source").toString();
}
@Override
public void close() throws IOException {
- // remove the scroll
- String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}",
scrollId);
- HttpEntity entity = new NStringEntity(requestBody,
ContentType.APPLICATION_JSON);
+ Request closeRequest = createCloseRequest();
+ // clear the selected iterator
try {
- Request request = new Request("DELETE", "/_search/scroll");
- request.addParameters(Collections.emptyMap());
- request.setEntity(entity);
- restClient.performRequest(request);
+ restClient.performRequest(closeRequest);
} finally {
if (restClient != null) {
restClient.close();
@@ -1158,6 +1236,195 @@ public class ElasticsearchIO {
return source;
}
}
+
+ static class BoundedElasticsearchScrollReader extends
BoundedElasticsearchReader {
+
+ public BoundedElasticsearchScrollReader(BoundedElasticsearchSource source)
{
+ super(source);
+ }
+
+ @Override
+ protected Request createStartRequest() {
+ String query = createBaseQuery();
+ if ((source.backendVersion >= 5) && source.numSlices != null &&
source.numSlices > 1) {
+ // if there is more than one slice, add the slice to the user query
+ String sliceQuery =
+ String.format("\"slice\": {\"id\": %s,\"max\": %s}",
source.sliceId, source.numSlices);
+ query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
+ }
+ String endPoint =
source.spec.getConnectionConfiguration().getSearchEndPoint();
+ Map<String, String> params = new HashMap<>();
+ params.put("scroll", source.spec.getScrollKeepalive());
+ HttpEntity queryEntity = new NStringEntity(query,
ContentType.APPLICATION_JSON);
+ Request request = new Request("GET", endPoint);
+ request.addParameters(params);
+ request.setEntity(queryEntity);
+ return request;
+ }
+
+ @Override
+ protected Request createAdvanceRequest() {
+ String requestBody =
+ String.format(
+ "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}",
+ source.spec.getScrollKeepalive(), iteratorId);
+ HttpEntity scrollEntity = new NStringEntity(requestBody,
ContentType.APPLICATION_JSON);
+ Request request = new Request("GET", "/_search/scroll");
+ request.addParameters(Collections.emptyMap());
+ request.setEntity(scrollEntity);
+ return request;
+ }
+
+ @Override
+ protected Request createCloseRequest() {
+ String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}",
iteratorId);
+ HttpEntity entity = new NStringEntity(requestBody,
ContentType.APPLICATION_JSON);
+ Request request = new Request("DELETE", "/_search/scroll");
+ request.addParameters(Collections.emptyMap());
+ request.setEntity(entity);
+ return request;
+ }
+
+ @Override
+ protected boolean processResult(JsonNode searchResult) throws IOException {
+ return readNextBatchAndReturnFirstDocument(searchResult);
+ }
+
+ @Override
+ protected void updateIteratorId(JsonNode searchResult) {
+ iteratorId = searchResult.path("_scroll_id").asText();
+ }
+ }
+
+ static class BoundedElasticsearchPITReader extends
BoundedElasticsearchReader {
+
+ private String searchAfterProperty = "";
+
+ public BoundedElasticsearchPITReader(BoundedElasticsearchSource source) {
+ super(source);
+ }
+
+ private String modifyQueryForPIT(String originalQuery) {
+ String trimmed = originalQuery.trim();
+ if (trimmed.startsWith("{") && trimmed.endsWith("}")) {
+ return trimmed.substring(1, trimmed.length() - 1);
+ }
+ return originalQuery;
+ }
+
+ @Override
+ protected String createBaseQuery() {
+ return modifyQueryForPIT(super.createBaseQuery()) + ", " +
source.spec.getPITSortConfig();
+ }
+
+ @Override
+ protected Request createStartRequest() {
+ String endPoint =
+ String.format("/%s/_pit",
source.spec.getConnectionConfiguration().getIndex());
+ Map<String, String> params = new HashMap<>();
+ params.put("keep_alive", source.spec.getScrollKeepalive());
+ Request request = new Request("POST", endPoint);
+ request.addParameters(params);
+ return request;
+ }
+
+ String searchAfter() {
+ if (searchAfterProperty.isEmpty()) {
+ return "";
+ }
+ return String.format("\"search_after\" : %s,", searchAfterProperty);
+ }
+
+ @Override
+ protected Request createAdvanceRequest() {
+ // if there is more than one slice, add the slice to the user query
+ String sliceQuery =
+ source.numSlices > 1
+ ? String.format(
+ "\"slice\" : {\"id\" : %s, \"max\" : %s},", source.sliceId,
source.numSlices)
+ : "";
+
+ String requestBody =
+ String.format(
+ "{"
+ + " %s"
+ + " \"size\" : %d,"
+ + " %s"
+ + " %s,"
+ + " \"pit\": {"
+ + " \"id\": \"%s\""
+ + " }"
+ + "}",
+ searchAfter(), source.spec.getBatchSize(), sliceQuery,
createBaseQuery(), iteratorId);
+ HttpEntity pitSearchEntity = new NStringEntity(requestBody,
ContentType.APPLICATION_JSON);
+ Request request = new Request("POST", "/_search");
+ request.addParameters(Collections.emptyMap());
+ request.setEntity(pitSearchEntity);
+ return request;
+ }
+
+ @Override
+ protected Request createCloseRequest() {
+ String requestBody = String.format("{\"id\" : \"%s\"}", iteratorId);
+ HttpEntity entity = new NStringEntity(requestBody,
ContentType.APPLICATION_JSON);
+ Request request = new Request("DELETE", "/_pit");
+ request.addParameters(Collections.emptyMap());
+ request.setEntity(entity);
+ return request;
+ }
+
+ String extractSearchAfterFromDocument(JsonNode document) {
+ return document.path("sort").toString();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (batchIterator.hasNext()) {
+ current = batchIterator.next();
+ searchAfterProperty = extractSearchAfterFromDocument(current);
+ return true;
+ } else {
+ return performAdvance();
+ }
+ }
+
+ @Override
+ protected boolean processResult(JsonNode searchResult) throws IOException {
+ JsonNode hits = searchResult.path("hits");
+ if (hits == null || hits.isMissingNode()) {
+ // after creating the PIT we need to make the first request to comply
with Reader API and
+ // try to get a first result or declare the source empty
+ return performAdvance();
+ }
+ JsonNode resultArray = hits.path("hits");
+ // check if results are empty
+ if (resultArray == null || resultArray.isEmpty()) {
+ return false;
+ }
+ // we already opened the PIT search and are processing the search results
+ boolean wasDocumentRead =
readNextBatchAndReturnFirstDocument(searchResult);
+ if (wasDocumentRead && current != null) {
+ searchAfterProperty = extractSearchAfterFromDocument(current);
+ }
+ return wasDocumentRead;
+ }
+
+ @Override
+ protected void updateIteratorId(JsonNode searchResult) {
+ iteratorId = extractPITId(searchResult);
+ }
+
+ String extractPITId(JsonNode searchResult) {
+ String maybeId = searchResult.path("id").asText();
+ // check if this is the first request
+ if (maybeId != null && !maybeId.isEmpty()) {
+ return maybeId;
+ } else {
+ return searchResult.path("pit_id").asText();
+ }
+ }
+ }
+
/**
* A POJO encapsulating a configuration for retry behavior when issuing
requests to ES. A retry
* will be attempted until the maxAttempts or maxDuration is exceeded,
whichever comes first, for
@@ -1423,8 +1690,8 @@ public class ElasticsearchIO {
*
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html">data
* stream</a>. Data streams only support the {@code create} operation. For
more information see
* the <a
- *
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc>
- * Elasticsearch documentation</a>
+ *
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc">Elasticsearch
+ * documentation</a>
*
* <p>Updates and deletions are not allowed, so related options will be
ignored.
*
@@ -1485,7 +1752,7 @@ public class ElasticsearchIO {
* the batch will fail and the exception propagated. Incompatible with
update operations and
* should only be used with withUsePartialUpdate(false)
*
- * @param docVersionType the version type to use, one of {@value
VERSION_TYPES}
+ * @param docVersionType the version type to use, one of {@link
VERSION_TYPES}
* @return the {@link DocToBulk} with the doc version type set
*/
public DocToBulk withDocVersionType(String docVersionType) {