[ https://issues.apache.org/jira/browse/BEAM-3026?focusedWorklogId=135668&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135668 ]
ASF GitHub Bot logged work on BEAM-3026: ---------------------------------------- Author: ASF GitHub Bot Created on: 17/Aug/18 11:05 Start Date: 17/Aug/18 11:05 Worklog Time Spent: 10m Work Description: echauchot commented on a change in pull request #6146: [BEAM-3026] Adding retrying behavior on ElasticSearchIO URL: https://github.com/apache/beam/pull/6146#discussion_r210873894 ########## File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java ########## @@ -512,4 +527,44 @@ public Void apply(Iterable<String> input) { return null; } } + + /** Test that the default predicate correctly parses chosen error code. */ + public void testDefaultRetryPredicate(RestClient restClient) throws IOException { + assertFalse(DEFAULT_RETRY_PREDICATE.test(new IOException("test"))); + String x = + "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" : \"1\" } }\n" + + "{ \"field1\" : @ }\n"; + HttpEntity entity = new NStringEntity(x, ContentType.APPLICATION_JSON); + + Response response = restClient.performRequest("POST", "/_bulk", Collections.emptyMap(), entity); + assertTrue(CUSTOM_RETRY_PREDICATE.test(new ResponseException(response))); + } + + /** + * Test that retries are invoked when Elasticsearch returns a specific error code. We invoke this + * by issuing corrupt data and retrying on the `400` error code. Normal behaviour is to retry on + * `429` only but that is difficult to simulate reliably. The logger is used to verify expected + * behavior. + */ + public void testWriteRetry() throws Throwable { + expectedException.expect(IOException.class); + expectedException.expectMessage( + String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, 2)); + + String data[] = {"{ \"x\" :a,\"y\":\"ab\" }"}; + ElasticsearchIO.Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withRetryConfiguration( + ElasticsearchIO.RetryConfiguration.create(3, Duration.millis(35000)) + .withRetryPredicate(CUSTOM_RETRY_PREDICATE)); + pipeline.apply(Create.of(Arrays.asList(data))).apply(write); + try { + pipeline.run(); + } catch (Exception ex) { + throw ex.getCause(); + } + + fail(); Review comment: not sure it is needed as you throw the exception ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 135668) > Improve retrying in ElasticSearch client > ---------------------------------------- > > Key: BEAM-3026 > URL: https://issues.apache.org/jira/browse/BEAM-3026 > Project: Beam > Issue Type: Improvement > Components: io-java-elasticsearch > Reporter: Tim Robertson > Assignee: Ravi Pathak > Priority: Major > Fix For: 2.7.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > Currently an overloaded ES server will result in clients failing fast. > I suggest implementing backoff pauses. Perhaps something like this: > {code} > ElasticsearchIO.ConnectionConfiguration conn = > ElasticsearchIO.ConnectionConfiguration > .create(new String[]{"http://...:9200"}, "test", "test") > .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000, > TimeUnit.MILLISECONDS) > .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10) > ); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)