[
https://issues.apache.org/jira/browse/BEAM-3026?focusedWorklogId=135657&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135657
]
ASF GitHub Bot logged work on BEAM-3026:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Aug/18 11:05
Start Date: 17/Aug/18 11:05
Worklog Time Spent: 10m
Work Description: echauchot commented on a change in pull request #6146:
[BEAM-3026] Adding retrying behavior on ElasticSearchIO
URL: https://github.com/apache/beam/pull/6146#discussion_r210838378
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -731,6 +744,81 @@ public void close() throws IOException {
return source;
}
}
+ /**
+ * A POJO encapsulating a configuration for retry behavior when issuing
requests to ES. A retry
+ * will be attempted until the maxAttempts or maxDuration is exceeded,
whichever comes first, for
+ * 429 TOO_MANY_REQUESTS error.
+ */
+ public static class RetryConfiguration extends BaseRetryConfiguration {
+
+ private RetryConfiguration(
+ int maxAttempts, Duration maxDuration, RetryPredicate retryPredicate) {
+ super(maxAttempts, maxDuration, retryPredicate);
+ }
+
+ /**
+ * Creates RetryConfiguration for {@link ElasticsearchIO} with provided
maxAttempts,
+ * maxDurations and exponential backoff based retries.
+ */
+ public static RetryConfiguration create(int maxAttempts, Duration
maxDuration) {
+ checkArgument(maxAttempts > 0, "maxAttempts must be greater than 0");
+ checkArgument(
+ maxDuration != null && maxDuration.isLongerThan(Duration.ZERO),
+ "maxDuration must be greater than 0");
+ return new RetryConfiguration(maxAttempts, maxDuration,
DEFAULT_RETRY_PREDICATE);
+ }
+
+ @VisibleForTesting
+ RetryConfiguration withRetryPredicate(RetryPredicate predicate) {
+ this.retryPredicate = predicate;
+ return this;
+ }
+
+ @VisibleForTesting
+ static final RetryPredicate DEFAULT_RETRY_PREDICATE = new
DefaultRetryPredicate();
+
+ /**
+ * This is the default predicate used to test if a failed ES operation
should be retried. A
+ * retry will be attempted until the maxAttempts or maxDuration is
exceeded, whichever comes
+ * first, for TOO_MANY_REQUESTS(429) error.
+ */
+ @VisibleForTesting
+ static class DefaultRetryPredicate implements RetryPredicate {
+
+ private int errorCode;
+
+ DefaultRetryPredicate(int code) {
+ this.errorCode = code;
+ }
+
+ DefaultRetryPredicate() {
+ this(429);
+ }
+
+ /** Returns true if the response has the error code for any mutation. */
+ private static boolean errorCodePresent(Response response, int
errorCode) {
+ try {
+ JsonNode json = parseResponse(response);
+ if (json.path("errors").asBoolean()) {
+ for (JsonNode item : json.path("items")) {
+ if (item.findValue("status").asInt() == errorCode) {
+ return true;
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not extract error codes from response {}", response);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean test(Throwable t) {
+ return (t instanceof ResponseException)
+ && errorCodePresent(((ResponseException) t).getResponse(),
this.errorCode);
Review comment:
nit: remove `this.` as it is unambiguous.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 135657)
Time Spent: 7h 40m (was: 7.5h)
> Improve retrying in ElasticSearch client
> ----------------------------------------
>
> Key: BEAM-3026
> URL: https://issues.apache.org/jira/browse/BEAM-3026
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Reporter: Tim Robertson
> Assignee: Ravi Pathak
> Priority: Major
> Fix For: 2.7.0
>
> Time Spent: 7h 40m
> Remaining Estimate: 0h
>
> Currently an overloaded ES server will result in clients failing fast.
> I suggest implementing backoff pauses. Perhaps something like this:
> {code}
> ElasticsearchIO.ConnectionConfiguration conn =
> ElasticsearchIO.ConnectionConfiguration
> .create(new String[]{"http://...:9200"}, "test", "test")
> .retryWithWaitStrategy(WaitStrategies.exponentialBackoff(1000,
> TimeUnit.MILLISECONDS)
> .retryWithStopStrategy(StopStrategies.stopAfterAttempt(10)
> );
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)