[ 
https://issues.apache.org/jira/browse/BEAM-3026?focusedWorklogId=137809&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137809
 ]

ASF GitHub Bot logged work on BEAM-3026:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Aug/18 13:22
            Start Date: 24/Aug/18 13:22
    Worklog Time Spent: 10m 
      Work Description: echauchot closed pull request #6146: [BEAM-3026] Adding 
retrying behavior on ElasticSearchIO
URL: https://github.com/apache/beam/pull/6146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 1e3e7b62dfb..862ba5b4292 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -220,4 +220,16 @@ public void testReadWithMetadata() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testReadWithMetadata();
   }
+
+  @Test
+  public void testDefaultRetryPredicate() throws IOException {
+    elasticsearchIOTestCommon.testDefaultRetryPredicate(restClient);
+  }
+
+  @Test
+  public void testWriteRetry() throws Throwable {
+    elasticsearchIOTestCommon.setExpectedException(expectedException);
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteRetry();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index c28b1906f84..d2791c76d1f 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -26,8 +26,10 @@
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertEquals;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import java.io.IOException;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -222,4 +224,16 @@ public void testReadWithMetadata() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testReadWithMetadata();
   }
+
+  @Test
+  public void testDefaultRetryPredicate() throws IOException {
+    elasticsearchIOTestCommon.testDefaultRetryPredicate(getRestClient());
+  }
+
+  @Test
+  public void testWriteRetry() throws Throwable {
+    elasticsearchIOTestCommon.setExpectedException(expectedException);
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteRetry();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index dedc49f0ea3..57b450d8c9e 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -25,11 +25,14 @@
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
 import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
+import static 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
 import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.core.Is.isA;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -37,7 +40,11 @@
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate;
+import 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -47,8 +54,13 @@
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.hamcrest.CustomMatcher;
+import org.joda.time.Duration;
 import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +70,18 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchIOTestCommon.class);
 
+  private static final RetryPredicate CUSTOM_RETRY_PREDICATE = new 
DefaultRetryPredicate(400);
+
+  private static final int EXPECTED_RETRIES = 2;
+  private static final int MAX_ATTEMPTS = 3;
+  private static final String BAD_FORMATTED_DOC[] = {"{ \"x\" :a,\"y\":\"ab\" 
}"};
+  private static final String OK_REQUEST =
+      "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : 
\"1\" } }\n"
+          + "{ \"field1\" : 1 }\n";
+  private static final String BAD_REQUEST =
+      "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : 
\"1\" } }\n"
+          + "{ \"field1\" : @ }\n";
+
   static final String ES_INDEX = "beam";
   static final String ES_TYPE = "test";
   static final long NUM_DOCS_UTESTS = 400L;
@@ -512,4 +536,41 @@ public Void apply(Iterable<String> input) {
       return null;
     }
   }
+
+  /** Test that the default predicate correctly parses chosen error code. */
+  public void testDefaultRetryPredicate(RestClient restClient) throws 
IOException {
+
+    HttpEntity entity1 = new NStringEntity(BAD_REQUEST, 
ContentType.APPLICATION_JSON);
+    Response response1 =
+        restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), 
entity1);
+    assertTrue(CUSTOM_RETRY_PREDICATE.test(response1));
+
+    HttpEntity entity2 = new NStringEntity(OK_REQUEST, 
ContentType.APPLICATION_JSON);
+    Response response2 =
+        restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), 
entity2);
+    assertFalse(DEFAULT_RETRY_PREDICATE.test(response2));
+  }
+
+  /**
+   * Test that retries are invoked when Elasticsearch returns a specific error 
code. We invoke this
+   * by issuing corrupt data and retrying on the `400` error code. Normal 
behaviour is to retry on
+   * `429` only but that is difficult to simulate reliably. The logger is used 
to verify expected
+   * behavior.
+   */
+  public void testWriteRetry() throws Throwable {
+    expectedException.expectCause(isA(IOException.class));
+    // max attempt is 3, but retry is 2 which excludes 1st attempt when error 
was identified and retry started.
+    expectedException.expectMessage(
+        String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, 
EXPECTED_RETRIES));
+
+    ElasticsearchIO.Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withRetryConfiguration(
+                ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, 
Duration.millis(35000))
+                    .withRetryPredicate(CUSTOM_RETRY_PREDICATE));
+    pipeline.apply(Create.of(Arrays.asList(BAD_FORMATTED_DOC))).apply(write);
+
+    pipeline.run();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 3076f365cdd..30c93d1782a 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -44,6 +44,7 @@
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -56,6 +57,10 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -73,6 +78,9 @@
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Transforms for reading and writing data from/to Elasticsearch.
@@ -133,6 +141,8 @@
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class ElasticsearchIO {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchIO.class);
+
   public static Read read() {
     // default scrollKeepalive = 5m as a majorant for un-predictable time 
between 2 start/read calls
     // default batchSize to 100 as recommended by ES dev team as a safe value 
when dealing
@@ -172,6 +182,7 @@ static void checkForErrors(Response response, int 
backendVersion) throws IOExcep
       JsonNode items = searchResult.path("items");
       //some items present in bulk might have errors, concatenate error 
messages
       for (JsonNode item : items) {
+
         String errorRootName = "";
         if (backendVersion == 2) {
           errorRootName = "create";
@@ -731,6 +742,112 @@ public void close() throws IOException {
       return source;
     }
   }
+  /**
+   * A POJO encapsulating a configuration for retry behavior when issuing 
requests to ES. A retry
+   * will be attempted until the maxAttempts or maxDuration is exceeded, 
whichever comes first, for
+   * 429 TOO_MANY_REQUESTS error.
+   */
+  @AutoValue
+  public abstract static class RetryConfiguration implements Serializable {
+    @VisibleForTesting
+    static final RetryPredicate DEFAULT_RETRY_PREDICATE = new 
DefaultRetryPredicate();
+
+    abstract int getMaxAttempts();
+
+    abstract Duration getMaxDuration();
+
+    abstract RetryPredicate getRetryPredicate();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract ElasticsearchIO.RetryConfiguration.Builder setMaxAttempts(int 
maxAttempts);
+
+      abstract ElasticsearchIO.RetryConfiguration.Builder 
setMaxDuration(Duration maxDuration);
+
+      abstract ElasticsearchIO.RetryConfiguration.Builder setRetryPredicate(
+          RetryPredicate retryPredicate);
+
+      abstract ElasticsearchIO.RetryConfiguration build();
+    }
+
+    /**
+     * Creates RetryConfiguration for {@link ElasticsearchIO} with provided 
maxAttempts,
+     * maxDurations and exponential backoff based retries.
+     *
+     * @param maxAttempts max number of attempts.
+     * @param maxDuration maximum duration for retries.
+     * @return {@link RetryConfiguration} object with provided settings.
+     */
+    public static RetryConfiguration create(int maxAttempts, Duration 
maxDuration) {
+      checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");
+      checkArgument(
+          maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+          "maxDuration must be greater than 0");
+      return new AutoValue_ElasticsearchIO_RetryConfiguration.Builder()
+          .setMaxAttempts(maxAttempts)
+          .setMaxDuration(maxDuration)
+          .setRetryPredicate(DEFAULT_RETRY_PREDICATE)
+          .build();
+    }
+
+    // Exposed only to allow tests to easily simulate server errors
+    @VisibleForTesting
+    RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
+      checkArgument(predicate != null, "predicate must be provided");
+      return builder().setRetryPredicate(predicate).build();
+    }
+
+    /**
+     * An interface used to control if we retry the Elasticsearch call when a 
{@link Response} is
+     * obtained. If {@link RetryPredicate#test(Object)} returns true, {@link 
Write} tries to resend
+     * the requests to the Elasticsearch server if the {@link 
RetryConfiguration} permits it.
+     */
+    @FunctionalInterface
+    interface RetryPredicate extends Predicate<Response>, Serializable {}
+
+    /**
+     * This is the default predicate used to test if a failed ES operation 
should be retried. A
+     * retry will be attempted until the maxAttempts or maxDuration is 
exceeded, whichever comes
+     * first, for TOO_MANY_REQUESTS(429) error.
+     */
+    @VisibleForTesting
+    static class DefaultRetryPredicate implements RetryPredicate {
+
+      private int errorCode;
+
+      DefaultRetryPredicate(int code) {
+        this.errorCode = code;
+      }
+
+      DefaultRetryPredicate() {
+        this(429);
+      }
+
+      /** Returns true if the response has the error code for any mutation. */
+      private static boolean errorCodePresent(Response response, int 
errorCode) {
+        try {
+          JsonNode json = parseResponse(response);
+          if (json.path("errors").asBoolean()) {
+            for (JsonNode item : json.path("items")) {
+              if (item.findValue("status").asInt() == errorCode) {
+                return true;
+              }
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Could not extract error codes from response {}", response);
+        }
+        return false;
+      }
+
+      @Override
+      public boolean test(Response response) {
+        return errorCodePresent(response, errorCode);
+      }
+    }
+  }
 
   /** A {@link PTransform} writing data to Elasticsearch. */
   @AutoValue
@@ -760,6 +877,9 @@ public void close() throws IOException {
     @Nullable
     abstract FieldValueExtractFn getTypeFn();
 
+    @Nullable
+    abstract RetryConfiguration getRetryConfiguration();
+
     abstract boolean getUsePartialUpdate();
 
     abstract Builder builder();
@@ -780,6 +900,8 @@ public void close() throws IOException {
 
       abstract Builder setUsePartialUpdate(boolean usePartialUpdate);
 
+      abstract Builder setRetryConfiguration(RetryConfiguration 
retryConfiguration);
+
       abstract Write build();
     }
 
@@ -879,6 +1001,33 @@ public Write withUsePartialUpdate(boolean 
usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Provides configuration to retry a failed batch call to Elasticsearch. A 
batch is considered
+     * as failed if the underlying {@link RestClient} surfaces 429 HTTP status 
code as error for one
+     * or more of the items in the {@link Response}. Users should consider 
that retrying might
+     * compound the underlying problem which caused the initial failure. Users 
should also be aware
+     * that once retrying is exhausted the error is surfaced to the runner 
which <em>may</em> then
+     * opt to retry the current bundle in entirety or abort if the max number 
of retries of the
+     * runner is completed. Retrying uses an exponential backoff algorithm, 
with minimum backoff of
+     * 5 seconds and then surfacing the error once the maximum number of 
retries or maximum
+     * configuration duration is exceeded.
+     *
+     * <p>Example use:
+     *
+     * <pre>{@code
+     * ElasticsearchIO.write()
+     *   .withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(10, 
Duration.standardMinutes(3))
+     *   ...
+     * }</pre>
+     *
+     * @param retryConfiguration the rules which govern the retry behavior
+     * @return the {@link Write} with retrying configured
+     */
+    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+      checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
+      return builder().setRetryConfiguration(retryConfiguration).build();
+    }
+
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
@@ -893,6 +1042,17 @@ public PDone expand(PCollection<String> input) {
       private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
       private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race 
conditions on updates
 
+      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
+
+      @VisibleForTesting
+      static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. 
Retry attempt[%d]";
+
+      @VisibleForTesting
+      static final String RETRY_FAILED_LOG =
+          "Error writing to ES after %d attempt(s). No more attempts allowed";
+
+      private transient FluentBackoff retryBackoff;
+
       private int backendVersion;
       private final Write spec;
       private transient RestClient restClient;
@@ -929,10 +1089,21 @@ public PDone expand(PCollection<String> input) {
       }
 
       @Setup
-      public void setup() throws Exception {
+      public void setup() throws IOException {
         ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
         backendVersion = getBackendVersion(connectionConfiguration);
         restClient = connectionConfiguration.createClient();
+
+        retryBackoff =
+            
FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
+
+        if (spec.getRetryConfiguration() != null) {
+          retryBackoff =
+              FluentBackoff.DEFAULT
+                  .withInitialBackoff(RETRY_INITIAL_BACKOFF)
+                  
.withMaxRetries(spec.getRetryConfiguration().getMaxAttempts() - 1)
+                  
.withMaxCumulativeBackoff(spec.getRetryConfiguration().getMaxDuration());
+        }
       }
 
       @StartBundle
@@ -999,11 +1170,12 @@ public void processElement(ProcessContext context) 
throws Exception {
       }
 
       @FinishBundle
-      public void finishBundle(FinishBundleContext context) throws Exception {
+      public void finishBundle(FinishBundleContext context)
+          throws IOException, InterruptedException {
         flushBatch();
       }
 
-      private void flushBatch() throws IOException {
+      private void flushBatch() throws IOException, InterruptedException {
         if (batch.isEmpty()) {
           return;
         }
@@ -1025,11 +1197,35 @@ private void flushBatch() throws IOException {
         HttpEntity requestBody =
             new NStringEntity(bulkRequest.toString(), 
ContentType.APPLICATION_JSON);
         response = restClient.performRequest("POST", endPoint, 
Collections.emptyMap(), requestBody);
+        if (spec.getRetryConfiguration() != null
+            && 
spec.getRetryConfiguration().getRetryPredicate().test(response)) {
+          response = handleRetry("POST", endPoint, Collections.emptyMap(), 
requestBody);
+        }
         checkForErrors(response, backendVersion);
       }
 
+      /** retry request based on retry configuration policy. */
+      private Response handleRetry(
+          String method, String endpoint, Map<String, String> params, 
HttpEntity requestBody)
+          throws IOException, InterruptedException {
+        Response response;
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = retryBackoff.backoff();
+        int attempt = 0;
+        //while retry policy exists
+        while (BackOffUtils.next(sleeper, backoff)) {
+          LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
+          response = restClient.performRequest(method, endpoint, params, 
requestBody);
+          //if response has no 429 errors
+          if 
(!spec.getRetryConfiguration().getRetryPredicate().test(response)) {
+            return response;
+          }
+        }
+        throw new IOException(String.format(RETRY_FAILED_LOG, attempt));
+      }
+
       @Teardown
-      public void closeClient() throws Exception {
+      public void closeClient() throws IOException {
         if (restClient != null) {
           restClient.close();
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137809)
    Time Spent: 15h 20m  (was: 15h 10m)

> Improve retrying in ElasticSearch client
> ----------------------------------------
>
>                 Key: BEAM-3026
>                 URL: https://issues.apache.org/jira/browse/BEAM-3026
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>            Reporter: Tim Robertson
>            Assignee: Ravi Pathak
>            Priority: Major
>             Fix For: 2.7.0
>
>          Time Spent: 15h 20m
>  Remaining Estimate: 0h
>
> Currently an overloaded ES server will result in clients failing fast.
> I suggest implementing backoff pauses.  Perhaps something like this:
> {code}
>     ElasticsearchIO.ConnectionConfiguration conn = 
> ElasticsearchIO.ConnectionConfiguration
>       .create(new String[]{"http://...:9200"}, "test", "test")
>       .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000, 
> TimeUnit.MILLISECONDS)
>       .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10)
>     );
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to