[
https://issues.apache.org/jira/browse/BEAM-3026?focusedWorklogId=137809&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137809
]
ASF GitHub Bot logged work on BEAM-3026:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Aug/18 13:22
Start Date: 24/Aug/18 13:22
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #6146: [BEAM-3026] Adding
retrying behavior on ElasticSearchIO
URL: https://github.com/apache/beam/pull/6146
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 1e3e7b62dfb..862ba5b4292 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -220,4 +220,16 @@ public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithMetadata();
}
+
+ @Test
+ public void testDefaultRetryPredicate() throws IOException {
+ elasticsearchIOTestCommon.testDefaultRetryPredicate(restClient);
+ }
+
+ @Test
+ public void testWriteRetry() throws Throwable {
+ elasticsearchIOTestCommon.setExpectedException(expectedException);
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteRetry();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index c28b1906f84..d2791c76d1f 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -26,8 +26,10 @@
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -222,4 +224,16 @@ public void testReadWithMetadata() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testReadWithMetadata();
}
+
+ @Test
+ public void testDefaultRetryPredicate() throws IOException {
+ elasticsearchIOTestCommon.testDefaultRetryPredicate(getRestClient());
+ }
+
+ @Test
+ public void testWriteRetry() throws Throwable {
+ elasticsearchIOTestCommon.setExpectedException(expectedException);
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteRetry();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index dedc49f0ea3..57b450d8c9e 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -25,11 +25,14 @@
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
@@ -37,7 +40,11 @@
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
+import
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -47,8 +54,13 @@
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.hamcrest.CustomMatcher;
+import org.joda.time.Duration;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +70,18 @@
private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchIOTestCommon.class);
+ private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new
DefaultRetryPredicate(400);
+
+ private static final int EXPECTED_RETRIES = 2;
+ private static final int MAX_ATTEMPTS = 3;
+ private static final String BAD_FORMATTED_DOC[] = {"{ \"x\" :a,\"y\":\"ab\"
}"};
+ private static final String OK_REQUEST =
+ "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" :
\"1\" } }\n"
+ + "{ \"field1\" : 1 }\n";
+ private static final String BAD_REQUEST =
+ "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" :
\"1\" } }\n"
+ + "{ \"field1\" : @ }\n";
+
static final String ES_INDEX = "beam";
static final String ES_TYPE = "test";
static final long NUM_DOCS_UTESTS = 400L;
@@ -512,4 +536,41 @@ public Void apply(Iterable<String> input) {
return null;
}
}
+
+ /** Test that the default predicate correctly parses chosen error code. */
+ public void testDefaultRetryPredicate(RestClient restClient) throws
IOException {
+
+ HttpEntity entity1 = new NStringEntity(BAD_REQUEST,
ContentType.APPLICATION_JSON);
+ Response response1 =
+ restClient.performRequest("POST", "/_bulk", Collections.emptyMap(),
entity1);
+ assertTrue(CUSTOM_RETRY_PREDICATE.test(response1));
+
+ HttpEntity entity2 = new NStringEntity(OK_REQUEST,
ContentType.APPLICATION_JSON);
+ Response response2 =
+ restClient.performRequest("POST", "/_bulk", Collections.emptyMap(),
entity2);
+ assertFalse(DEFAULT_RETRY_PREDICATE.test(response2));
+ }
+
+ /**
+ * Test that retries are invoked when Elasticsearch returns a specific error
code. We invoke this
+ * by issuing corrupt data and retrying on the `400` error code. Normal
behaviour is to retry on
+ * `429` only but that is difficult to simulate reliably. The logger is used
to verify expected
+ * behavior.
+ */
+ public void testWriteRetry() throws Throwable {
+ expectedException.expectCause(isA(IOException.class));
+ // max attempt is 3, but retry is 2 which excludes 1st attempt when error
was identified and retry started.
+ expectedException.expectMessage(
+ String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG,
EXPECTED_RETRIES));
+
+ ElasticsearchIO.Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withRetryConfiguration(
+ ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS,
Duration.millis(35000))
+ .withRetryPredicate(CUSTOM_RETRY_PREDICATE));
+ pipeline.apply(Create.of(Arrays.asList(BAD_FORMATTED_DOC))).apply(write);
+
+ pipeline.run();
+ }
}
diff --git
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 3076f365cdd..30c93d1782a 100644
---
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -44,6 +44,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.function.Predicate;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.beam.sdk.annotations.Experimental;
@@ -56,6 +57,10 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -73,6 +78,9 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Transforms for reading and writing data from/to Elasticsearch.
@@ -133,6 +141,8 @@
@Experimental(Experimental.Kind.SOURCE_SINK)
public class ElasticsearchIO {
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchIO.class);
+
public static Read read() {
// default scrollKeepalive = 5m as a majorant for un-predictable time
between 2 start/read calls
// default batchSize to 100 as recommended by ES dev team as a safe value
when dealing
@@ -172,6 +182,7 @@ static void checkForErrors(Response response, int
backendVersion) throws IOExcep
JsonNode items = searchResult.path("items");
//some items present in bulk might have errors, concatenate error
messages
for (JsonNode item : items) {
+
String errorRootName = "";
if (backendVersion == 2) {
errorRootName = "create";
@@ -731,6 +742,112 @@ public void close() throws IOException {
return source;
}
}
+ /**
+ * A POJO encapsulating a configuration for retry behavior when issuing
requests to ES. A retry
+ * will be attempted until the maxAttempts or maxDuration is exceeded,
whichever comes first, for
+ * 429 TOO_MANY_REQUESTS error.
+ */
+ @AutoValue
+ public abstract static class RetryConfiguration implements Serializable {
+ @VisibleForTesting
+ static final RetryPredicate DEFAULT_RETRY_PREDICATE = new
DefaultRetryPredicate();
+
+ abstract int getMaxAttempts();
+
+ abstract Duration getMaxDuration();
+
+ abstract RetryPredicate getRetryPredicate();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract ElasticsearchIO.RetryConfiguration.Builder setMaxAttempts(int
maxAttempts);
+
+ abstract ElasticsearchIO.RetryConfiguration.Builder
setMaxDuration(Duration maxDuration);
+
+ abstract ElasticsearchIO.RetryConfiguration.Builder setRetryPredicate(
+ RetryPredicate retryPredicate);
+
+ abstract ElasticsearchIO.RetryConfiguration build();
+ }
+
+ /**
+ * Creates RetryConfiguration for {@link ElasticsearchIO} with provided
maxAttempts,
+ * maxDurations and exponential backoff based retries.
+ *
+ * @param maxAttempts max number of attempts.
+ * @param maxDuration maximum duration for retries.
+ * @return {@link RetryConfiguration} object with provided settings.
+ */
+ public static RetryConfiguration create(int maxAttempts, Duration
maxDuration) {
+ checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");
+ checkArgument(
+ maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+ "maxDuration must be greater than 0");
+ return new AutoValue_ElasticsearchIO_RetryConfiguration.Builder()
+ .setMaxAttempts(maxAttempts)
+ .setMaxDuration(maxDuration)
+ .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+ .build();
+ }
+
+ // Exposed only to allow tests to easily simulate server errors
+ @VisibleForTesting
+ RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
+ checkArgument(predicate != null, "predicate must be provided");
+ return builder().setRetryPredicate(predicate).build();
+ }
+
+ /**
+ * An interface used to control if we retry the Elasticsearch call when a
{@link Response} is
+ * obtained. If {@link RetryPredicate#test(Object)} returns true, {@link
Write} tries to resend
+ * the requests to the Elasticsearch server if the {@link
RetryConfiguration} permits it.
+ */
+ @FunctionalInterface
+ interface RetryPredicate extends Predicate<Response>, Serializable {}
+
+ /**
+ * This is the default predicate used to test if a failed ES operation
should be retried. A
+ * retry will be attempted until the maxAttempts or maxDuration is
exceeded, whichever comes
+ * first, for TOO_MANY_REQUESTS(429) error.
+ */
+ @VisibleForTesting
+ static class DefaultRetryPredicate implements RetryPredicate {
+
+ private int errorCode;
+
+ DefaultRetryPredicate(int code) {
+ this.errorCode = code;
+ }
+
+ DefaultRetryPredicate() {
+ this(429);
+ }
+
+ /** Returns true if the response has the error code for any mutation. */
+ private static boolean errorCodePresent(Response response, int
errorCode) {
+ try {
+ JsonNode json = parseResponse(response);
+ if (json.path("errors").asBoolean()) {
+ for (JsonNode item : json.path("items")) {
+ if (item.findValue("status").asInt() == errorCode) {
+ return true;
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not extract error codes from response {}", response);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean test(Response response) {
+ return errorCodePresent(response, errorCode);
+ }
+ }
+ }
/** A {@link PTransform} writing data to Elasticsearch. */
@AutoValue
@@ -760,6 +877,9 @@ public void close() throws IOException {
@Nullable
abstract FieldValueExtractFn getTypeFn();
+ @Nullable
+ abstract RetryConfiguration getRetryConfiguration();
+
abstract boolean getUsePartialUpdate();
abstract Builder builder();
@@ -780,6 +900,8 @@ public void close() throws IOException {
abstract Builder setUsePartialUpdate(boolean usePartialUpdate);
+ abstract Builder setRetryConfiguration(RetryConfiguration
retryConfiguration);
+
abstract Write build();
}
@@ -879,6 +1001,33 @@ public Write withUsePartialUpdate(boolean
usePartialUpdate) {
return builder().setUsePartialUpdate(usePartialUpdate).build();
}
+ /**
+ * Provides configuration to retry a failed batch call to Elasticsearch. A
batch is considered
+ * as failed if the underlying {@link RestClient} surfaces 429 HTTP status
code as error for one
+ * or more of the items in the {@link Response}. Users should consider
that retrying might
+ * compound the underlying problem which caused the initial failure. Users
should also be aware
+ * that once retrying is exhausted the error is surfaced to the runner
which <em>may</em> then
+ * opt to retry the current bundle in entirety or abort if the max number
of retries of the
+ * runner is completed. Retrying uses an exponential backoff algorithm,
with minimum backoff of
+ * 5 seconds and then surfacing the error once the maximum number of
retries or maximum
+ * configuration duration is exceeded.
+ *
+ * <p>Example use:
+ *
+ * <pre>{@code
+ * ElasticsearchIO.write()
+ * .withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10,
Duration.standardMinutes(3))
+ * ...
+ * }</pre>
+ *
+ * @param retryConfiguration the rules which govern the retry behavior
+ * @return the {@link Write} with retrying configured
+ */
+ public Write withRetryConfiguration(RetryConfiguration retryConfiguration)
{
+ checkArgument(retryConfiguration != null, "retryConfiguration is
required");
+ return builder().setRetryConfiguration(retryConfiguration).build();
+ }
+
@Override
public PDone expand(PCollection<String> input) {
ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
@@ -893,6 +1042,17 @@ public PDone expand(PCollection<String> input) {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race
conditions on updates
+ private static final Duration RETRY_INITIAL_BACKOFF =
Duration.standardSeconds(5);
+
+ @VisibleForTesting
+ static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch.
Retry attempt[%d]";
+
+ @VisibleForTesting
+ static final String RETRY_FAILED_LOG =
+ "Error writing to ES after %d attempt(s). No more attempts allowed";
+
+ private transient FluentBackoff retryBackoff;
+
private int backendVersion;
private final Write spec;
private transient RestClient restClient;
@@ -929,10 +1089,21 @@ public PDone expand(PCollection<String> input) {
}
@Setup
- public void setup() throws Exception {
+ public void setup() throws IOException {
ConnectionConfiguration connectionConfiguration =
spec.getConnectionConfiguration();
backendVersion = getBackendVersion(connectionConfiguration);
restClient = connectionConfiguration.createClient();
+
+ retryBackoff =
+
FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
+
+ if (spec.getRetryConfiguration() != null) {
+ retryBackoff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(RETRY_INITIAL_BACKOFF)
+
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
+
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
+ }
}
@StartBundle
@@ -999,11 +1170,12 @@ public void processElement(ProcessContext context)
throws Exception {
}
@FinishBundle
- public void finishBundle(FinishBundleContext context) throws Exception {
+ public void finishBundle(FinishBundleContext context)
+ throws IOException, InterruptedException {
flushBatch();
}
- private void flushBatch() throws IOException {
+ private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return;
}
@@ -1025,11 +1197,35 @@ private void flushBatch() throws IOException {
HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(),
ContentType.APPLICATION_JSON);
response = restClient.performRequest("POST", endPoint,
Collections.emptyMap(), requestBody);
+ if (spec.getRetryConfiguration() != null
+ &&
spec.getRetryConfiguration().getRetryPredicate().test(response)) {
+ response = handleRetry("POST", endPoint, Collections.emptyMap(),
requestBody);
+ }
checkForErrors(response, backendVersion);
}
+ /** retry request based on retry configuration policy. */
+ private Response handleRetry(
+ String method, String endpoint, Map<String, String> params,
HttpEntity requestBody)
+ throws IOException, InterruptedException {
+ Response response;
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = retryBackoff.backoff();
+ int attempt = 0;
+ //while retry policy exists
+ while (BackOffUtils.next(sleeper, backoff)) {
+ LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
+ response = restClient.performRequest(method, endpoint, params,
requestBody);
+ //if response has no 429 errors
+ if
(!spec.getRetryConfiguration().getRetryPredicate().test(response)) {
+ return response;
+ }
+ }
+ throw new IOException(String.format(RETRY_FAILED_LOG, attempt));
+ }
+
@Teardown
- public void closeClient() throws Exception {
+ public void closeClient() throws IOException {
if (restClient != null) {
restClient.close();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 137809)
Time Spent: 15h 20m (was: 15h 10m)
> Improve retrying in ElasticSearch client
> ----------------------------------------
>
> Key: BEAM-3026
> URL: https://issues.apache.org/jira/browse/BEAM-3026
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Reporter: Tim Robertson
> Assignee: Ravi Pathak
> Priority: Major
> Fix For: 2.7.0
>
> Time Spent: 15h 20m
> Remaining Estimate: 0h
>
> Currently an overloaded ES server will result in clients failing fast.
> I suggest implementing backoff pauses. Perhaps something like this:
> {code}
> ElasticsearchIO.ConnectionConfiguration conn =
> ElasticsearchIO.ConnectionConfiguration
> .create(new String[]{"http://...:9200"}, "test", "test")
> .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000,
> TimeUnit.MILLISECONDS)
> .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10)
> );
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)