[ 
https://issues.apache.org/jira/browse/BEAM-5725?focusedWorklogId=163378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163378
 ]

ASF GitHub Bot logged work on BEAM-5725:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Nov/18 08:44
            Start Date: 07/Nov/18 08:44
    Worklog Time Spent: 10m 
      Work Description: timrobertson100 closed pull request #6961: BEAM-5725] 
ElasticsearchIO retryConfiguration response parse failure fix
URL: https://github.com/apache/beam/pull/6961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 52609e54d40..7ebaaf699db 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -210,4 +210,10 @@ public void testWriteRetry() throws Throwable {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetry();
   }
+
+  @Test
+  public void testWriteRetryValidRequest() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteRetryValidRequest();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 2414f8f44da..46e5b79d772 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -202,4 +202,10 @@ public void testWriteRetry() throws Throwable {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetry();
   }
+
+  @Test
+  public void testWriteRetryValidRequest() throws Throwable {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteRetryValidRequest();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 12b2c5af3c7..c93edfc1cb7 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -196,4 +196,10 @@ public void testWriteRetry() throws Throwable {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetry();
   }
+
+  @Test
+  public void testWriteRetryValidRequest() throws Throwable {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteRetryValidRequest();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 57d4e167da4..e6b008cad20 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -118,7 +118,7 @@ static void insertTestDocuments(
         restClient.performRequest(
             "POST", endPoint, Collections.singletonMap("refresh", "true"), 
requestBody);
     ElasticsearchIO.checkForErrors(
-        response, ElasticsearchIO.getBackendVersion(connectionConfiguration));
+        response.getEntity(), 
ElasticsearchIO.getBackendVersion(connectionConfiguration));
   }
 
   /**
@@ -154,7 +154,7 @@ static long refreshIndexAndGetCurrentNumDocs(RestClient 
restClient, String index
 
       endPoint = String.format("/%s/%s/_search", index, type);
       Response response = restClient.performRequest("GET", endPoint);
-      JsonNode searchResult = ElasticsearchIO.parseResponse(response);
+      JsonNode searchResult = 
ElasticsearchIO.parseResponse(response.getEntity());
       result = searchResult.path("hits").path("total").asLong();
     } catch (IOException e) {
       // it is fine to ignore bellow exceptions because in 
testWriteWithBatchSize* sometimes,
@@ -238,7 +238,7 @@ static int countByMatch(
     HttpEntity httpEntity = new NStringEntity(requestBody, 
ContentType.APPLICATION_JSON);
     Response response =
         restClient.performRequest("GET", endPoint, Collections.emptyMap(), 
httpEntity);
-    JsonNode searchResult = parseResponse(response);
+    JsonNode searchResult = parseResponse(response.getEntity());
     return searchResult.path("hits").path("total").asInt();
   }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 9b38391a782..579c968adf4 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -235,19 +235,8 @@ void testReadWithMetadata() throws Exception {
   }
 
   void testWrite() throws Exception {
-    List<String> data =
-        ElasticSearchIOTestUtils.createDocuments(
-            numDocs, 
ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
-    pipeline
-        .apply(Create.of(data))
-        
.apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration));
-    pipeline.run();
-
-    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
-    assertEquals(numDocs, currentNumDocs);
-
-    int count = countByScientistName(connectionConfiguration, restClient, 
"Einstein");
-    assertEquals(numDocs / NUM_SCIENTISTS, count);
+    Write write = 
ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration);
+    executeWriteTest(write);
   }
 
   void testWriteWithErrors() throws Exception {
@@ -584,17 +573,17 @@ public Void apply(Iterable<String> input) {
   }
 
   /** Test that the default predicate correctly parses chosen error code. */
-  public void testDefaultRetryPredicate(RestClient restClient) throws 
IOException {
+  void testDefaultRetryPredicate(RestClient restClient) throws IOException {
 
     HttpEntity entity1 = new NStringEntity(BAD_REQUEST, 
ContentType.APPLICATION_JSON);
     Response response1 =
         restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), 
entity1);
-    assertTrue(CUSTOM_RETRY_PREDICATE.test(response1));
+    assertTrue(CUSTOM_RETRY_PREDICATE.test(response1.getEntity()));
 
     HttpEntity entity2 = new NStringEntity(OK_REQUEST, 
ContentType.APPLICATION_JSON);
     Response response2 =
         restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), 
entity2);
-    assertFalse(DEFAULT_RETRY_PREDICATE.test(response2));
+    assertFalse(DEFAULT_RETRY_PREDICATE.test(response2.getEntity()));
   }
 
   /**
@@ -603,7 +592,7 @@ public void testDefaultRetryPredicate(RestClient 
restClient) throws IOException
    * `429` only but that is difficult to simulate reliably. The logger is used 
to verify expected
    * behavior.
    */
-  public void testWriteRetry() throws Throwable {
+  void testWriteRetry() throws Throwable {
     expectedException.expectCause(isA(IOException.class));
     // max attempt is 3, but retry is 2 which excludes 1st attempt when error 
was identified and retry started.
     expectedException.expectMessage(
@@ -619,4 +608,28 @@ public void testWriteRetry() throws Throwable {
 
     pipeline.run();
   }
+
+  void testWriteRetryValidRequest() throws Exception {
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withRetryConfiguration(
+                ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, 
Duration.millis(35000))
+                    .withRetryPredicate(CUSTOM_RETRY_PREDICATE));
+    executeWriteTest(write);
+  }
+
+  private void executeWriteTest(ElasticsearchIO.Write write) throws Exception {
+    List<String> data =
+        ElasticSearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    int count = countByScientistName(connectionConfiguration, restClient, 
"Einstein");
+    assertEquals(numDocs / NUM_SCIENTISTS, count);
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index a2c6233518f..f44b1e22776 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -70,6 +70,7 @@
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.BufferedHttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
@@ -171,12 +172,12 @@ private ElasticsearchIO() {}
   private static final ObjectMapper mapper = new ObjectMapper();
 
   @VisibleForTesting
-  static JsonNode parseResponse(Response response) throws IOException {
-    return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
+  static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
+    return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(Response response, int backendVersion) throws 
IOException {
-    JsonNode searchResult = parseResponse(response);
+  static void checkForErrors(HttpEntity responseEntity, int backendVersion) 
throws IOException {
+    JsonNode searchResult = parseResponse(responseEntity);
     boolean errors = searchResult.path("errors").asBoolean();
     if (errors) {
       StringBuilder errorMessages =
@@ -637,7 +638,7 @@ private static JsonNode getStats(
       }
       String endpoint = String.format("/%s/_stats", 
connectionConfiguration.getIndex());
       try (RestClient restClient = connectionConfiguration.createClient()) {
-        return parseResponse(restClient.performRequest("GET", endpoint, 
params));
+        return parseResponse(restClient.performRequest("GET", endpoint, 
params).getEntity());
       }
     }
   }
@@ -671,7 +672,6 @@ public boolean start() throws IOException {
             String.format("\"slice\": {\"id\": %s,\"max\": %s}", 
source.sliceId, source.numSlices);
         query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
       }
-      Response response;
       String endPoint =
           String.format(
               "/%s/%s/_search",
@@ -686,8 +686,8 @@ public boolean start() throws IOException {
         }
       }
       HttpEntity queryEntity = new NStringEntity(query, 
ContentType.APPLICATION_JSON);
-      response = restClient.performRequest("GET", endPoint, params, 
queryEntity);
-      JsonNode searchResult = parseResponse(response);
+      Response response = restClient.performRequest("GET", endPoint, params, 
queryEntity);
+      JsonNode searchResult = parseResponse(response.getEntity());
       updateScrollId(searchResult);
       return readNextBatchAndReturnFirstDocument(searchResult);
     }
@@ -710,7 +710,7 @@ public boolean advance() throws IOException {
         Response response =
             restClient.performRequest(
                 "GET", "/_search/scroll", Collections.emptyMap(), 
scrollEntity);
-        JsonNode searchResult = parseResponse(response);
+        JsonNode searchResult = parseResponse(response.getEntity());
         updateScrollId(searchResult);
         return readNextBatchAndReturnFirstDocument(searchResult);
       }
@@ -830,7 +830,7 @@ RetryConfiguration withRetryPredicate(RetryPredicate 
predicate) {
      * the requests to the Elasticsearch server if the {@link 
RetryConfiguration} permits it.
      */
     @FunctionalInterface
-    interface RetryPredicate extends Predicate<Response>, Serializable {}
+    interface RetryPredicate extends Predicate<HttpEntity>, Serializable {}
 
     /**
      * This is the default predicate used to test if a failed ES operation 
should be retried. A
@@ -851,9 +851,9 @@ RetryConfiguration withRetryPredicate(RetryPredicate 
predicate) {
       }
 
       /** Returns true if the response has the error code for any mutation. */
-      private static boolean errorCodePresent(Response response, int 
errorCode) {
+      private static boolean errorCodePresent(HttpEntity responseEntity, int 
errorCode) {
         try {
-          JsonNode json = parseResponse(response);
+          JsonNode json = parseResponse(responseEntity);
           if (json.path("errors").asBoolean()) {
             for (JsonNode item : json.path("items")) {
               if (item.findValue("status").asInt() == errorCode) {
@@ -862,14 +862,14 @@ private static boolean errorCodePresent(Response 
response, int errorCode) {
             }
           }
         } catch (IOException e) {
-          LOG.warn("Could not extract error codes from response {}", response);
+          LOG.warn("Could not extract error codes from responseEntity {}", 
responseEntity);
         }
         return false;
       }
 
       @Override
-      public boolean test(Response response) {
-        return errorCodePresent(response, errorCode);
+      public boolean test(HttpEntity responseEntity) {
+        return errorCodePresent(responseEntity, errorCode);
       }
     }
   }
@@ -1211,6 +1211,7 @@ private void flushBatch() throws IOException, 
InterruptedException {
         batch.clear();
         currentBatchSizeBytes = 0;
         Response response;
+        HttpEntity responseEntity;
         // Elasticsearch will default to the index/type provided here if none 
are set in the
         // document meta (i.e. using ElasticsearchIO$Write#withIndexFn and
         // ElasticsearchIO$Write#withTypeFn options)
@@ -1222,18 +1223,20 @@ private void flushBatch() throws IOException, 
InterruptedException {
         HttpEntity requestBody =
             new NStringEntity(bulkRequest.toString(), 
ContentType.APPLICATION_JSON);
         response = restClient.performRequest("POST", endPoint, 
Collections.emptyMap(), requestBody);
+        responseEntity = new BufferedHttpEntity(response.getEntity());
         if (spec.getRetryConfiguration() != null
-            && 
spec.getRetryConfiguration().getRetryPredicate().test(response)) {
-          response = handleRetry("POST", endPoint, Collections.emptyMap(), 
requestBody);
+            && 
spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
+          responseEntity = handleRetry("POST", endPoint, 
Collections.emptyMap(), requestBody);
         }
-        checkForErrors(response, backendVersion);
+        checkForErrors(responseEntity, backendVersion);
       }
 
       /** retry request based on retry configuration policy. */
-      private Response handleRetry(
+      private HttpEntity handleRetry(
           String method, String endpoint, Map<String, String> params, 
HttpEntity requestBody)
           throws IOException, InterruptedException {
         Response response;
+        HttpEntity responseEntity;
         Sleeper sleeper = Sleeper.DEFAULT;
         BackOff backoff = retryBackoff.backoff();
         int attempt = 0;
@@ -1241,9 +1244,10 @@ private Response handleRetry(
         while (BackOffUtils.next(sleeper, backoff)) {
           LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
           response = restClient.performRequest(method, endpoint, params, 
requestBody);
+          responseEntity = new BufferedHttpEntity(response.getEntity());
           //if response has no 429 errors
-          if 
(!spec.getRetryConfiguration().getRetryPredicate().test(response)) {
-            return response;
+          if 
(!spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
+            return responseEntity;
           }
         }
         throw new IOException(String.format(RETRY_FAILED_LOG, attempt));
@@ -1261,7 +1265,7 @@ public void closeClient() throws IOException {
   static int getBackendVersion(ConnectionConfiguration 
connectionConfiguration) {
     try (RestClient restClient = connectionConfiguration.createClient()) {
       Response response = restClient.performRequest("GET", "");
-      JsonNode jsonNode = parseResponse(response);
+      JsonNode jsonNode = parseResponse(response.getEntity());
       int backendVersion =
           
Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 
1));
       checkArgument(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 163378)
    Time Spent: 1h  (was: 50m)

> ElasticsearchIO RetryConfiguration response parse failure
> ---------------------------------------------------------
>
>                 Key: BEAM-5725
>                 URL: https://issues.apache.org/jira/browse/BEAM-5725
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>            Reporter: Wout Scheepers
>            Assignee: Wout Scheepers
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> When using .withRetryConfiguration() for ElasticsearchIO, I get the following 
> stacktrace:
>  
>  
> {code:java}
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No 
> content to map due to end-of-input
> at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1, column: 
> 0]
> at 
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> at 
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988)
> at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3058)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.parseResponse(ElasticsearchIO.java:173)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:177)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1204)
> at 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1175)
> {code}
>  
>  
> Probably the elastic response object's content stream is consumed twice, 
> resulting in a MismatchedInputException.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to