[ 
https://issues.apache.org/jira/browse/BEAM-3026?focusedWorklogId=135669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135669
 ]

ASF GitHub Bot logged work on BEAM-3026:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Aug/18 11:05
            Start Date: 17/Aug/18 11:05
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on a change in pull request #6146: 
[BEAM-3026] Adding retrying behavior on ElasticSearchIO
URL: https://github.com/apache/beam/pull/6146#discussion_r210863705
 
 

 ##########
 File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 ##########
 @@ -1025,11 +1172,40 @@ private void flushBatch() throws IOException {
         HttpEntity requestBody =
             new NStringEntity(bulkRequest.toString(), 
ContentType.APPLICATION_JSON);
         response = restClient.performRequest("POST", endPoint, 
Collections.emptyMap(), requestBody);
+        if (spec.getRetryConfiguration() != null
+            && spec.getRetryConfiguration()
+                .getRetryPredicate()
+                .test(new ResponseException(response))) {
+          response = handleRetry("POST", endPoint, Collections.emptyMap(), 
requestBody);
+        }
         checkForErrors(response, backendVersion);
       }
 
+      /** retry request based on retry configuration policy. */
+      private Response handleRetry(
+          String method, String endpoint, Map<String, String> params, 
HttpEntity requestBody)
+          throws IOException, InterruptedException {
+        Response response = null;
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = retryBackoff.backoff();
+        int attempt = 0;
+        //while retry policy exists
+        while (BackOffUtils.next(sleeper, backoff)) {
+          LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
+          response = restClient.performRequest(method, endpoint, params, 
requestBody);
+          if (spec.getRetryConfiguration()
+              .getRetryPredicate()
+              .test(new ResponseException(response))) {
 
 Review comment:
   it is very strange to create an Exception outside of a catch block. You 
create an exception whereas there may be no problem at all. Rather I think you 
should parametrize the Predicate to take a Response object and not a 
ResponseException object and remove the `t instanceof ResponseException` in the 
implementation of predicate#test because it will always be true as you create 
your own ResponseException.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 135669)

> Improve retrying in ElasticSearch client
> ----------------------------------------
>
>                 Key: BEAM-3026
>                 URL: https://issues.apache.org/jira/browse/BEAM-3026
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>            Reporter: Tim Robertson
>            Assignee: Ravi Pathak
>            Priority: Major
>             Fix For: 2.7.0
>
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Currently an overloaded ES server will result in clients failing fast.
> I suggest implementing backoff pauses.  Perhaps something like this:
> {code}
>     ElasticsearchIO.ConnectionConfiguration conn = 
> ElasticsearchIO.ConnectionConfiguration
>       .create(new String[]{"http://...:9200"}, "test", "test")
>       .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000, 
> TimeUnit.MILLISECONDS)
>       .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10)
>     );
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to