[
https://issues.apache.org/jira/browse/BEAM-3026?focusedWorklogId=136853&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136853
]
ASF GitHub Bot logged work on BEAM-3026:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Aug/18 07:36
Start Date: 22/Aug/18 07:36
Worklog Time Spent: 10m
Work Description: echauchot commented on a change in pull request #6146:
[BEAM-3026] Adding retrying behavior on ElasticSearchIO
URL: https://github.com/apache/beam/pull/6146#discussion_r211655755
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -512,4 +529,53 @@ public Void apply(Iterable<String> input) {
return null;
}
}
+
+ /** Test that the default predicate correctly parses chosen error code. */
+ public void testDefaultRetryPredicate(RestClient restClient) throws
IOException {
+ String okRequest =
+ "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" :
\"1\" } }\n"
+ + "{ \"field1\" : 1 }\n";
+ String badRequest =
+ "{ \"index\" : { \"_index\" : \"test\", \"_type\" : \"doc\", \"_id\" :
\"1\" } }\n"
+ + "{ \"field1\" : @ }\n";
+
+ HttpEntity entity1 = new NStringEntity(badRequest,
ContentType.APPLICATION_JSON);
+ Response response1 =
+ restClient.performRequest("POST", "/_bulk", Collections.emptyMap(),
entity1);
+ assertTrue(CUSTOM_RETRY_PREDICATE.test(response1));
+
+ HttpEntity entity2 = new NStringEntity(okRequest,
ContentType.APPLICATION_JSON);
+ Response response2 =
+ restClient.performRequest("POST", "/_bulk", Collections.emptyMap(),
entity2);
+ assertFalse(DEFAULT_RETRY_PREDICATE.test(response2));
+ }
+
+ /**
+ * Test that retries are invoked when Elasticsearch returns a specific error
code. We invoke this
+ * by issuing corrupt data and retrying on the `400` error code. Normal
behaviour is to retry on
+ * `429` only but that is difficult to simulate reliably. The logger is used
to verify expected
+ * behavior.
+ */
+ public void testWriteRetry() throws Throwable {
+ expectedException.expect(IOException.class);
+ // max attempt is 3, but retry is 2 which excludes 1st attempt when error
was identified and retry started.
+ expectedException.expectMessage(
+ String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG,
EXPECTED_RETRIES));
+
+ String data[] = {"{ \"x\" :a,\"y\":\"ab\" }"};
+ ElasticsearchIO.Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withRetryConfiguration(
+ ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS,
Duration.millis(35000))
+ .withRetryPredicate(CUSTOM_RETRY_PREDICATE));
+ pipeline.apply(Create.of(Arrays.asList(data))).apply(write);
+ try {
+ pipeline.run();
+ } catch (Exception ex) {
+ throw ex.getCause();
+ }
+ //when there is no exception, test should be failed
+ fail();
Review comment:
can't you just remove the` try/catch` and the` fail() `and let the exception
raise? the `expectedException` junit rule should make the test fail if no
exception is thrown and make it succeed if an exception is thrown
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 136853)
Time Spent: 12h 10m (was: 12h)
> Improve retrying in ElasticSearch client
> ----------------------------------------
>
> Key: BEAM-3026
> URL: https://issues.apache.org/jira/browse/BEAM-3026
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Reporter: Tim Robertson
> Assignee: Ravi Pathak
> Priority: Major
> Fix For: 2.7.0
>
> Time Spent: 12h 10m
> Remaining Estimate: 0h
>
> Currently an overloaded ES server will result in clients failing fast.
> I suggest implementing backoff pauses. Perhaps something like this:
> {code}
> ElasticsearchIO.ConnectionConfiguration conn =
> ElasticsearchIO.ConnectionConfiguration
> .create(new String[]{"http://...:9200"}, "test", "test")
> .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000,
> TimeUnit.MILLISECONDS)
> .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10)
> );
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)