[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.

Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk 
queue on node full


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaac7c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaac7c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaac7c2e

Branch: refs/heads/master
Commit: aaac7c2e505717dbfc40465cb3656652e1bf5658
Parents: 0ba08b4
Author: Max Kuklinski <[email protected]>
Authored: Wed Nov 23 17:54:11 2016 +0100
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            |  5 ++
 .../elasticsearch/ElasticsearchSinkBase.java    | 62 ++++++++++++++++++--
 2 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md 
b/docs/dev/connectors/elasticsearch.md
index 0f2d025..4388b9a 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
 The difference is that now we do not need to provide a list of addresses
 of Elasticsearch nodes.
 
+Optionally, the sink can try to re-execute the bulk request when the error
+message matches certain patterns indicating a timeout or a overloaded cluster.
+This behaviour is disabled by default and can be enabled by setting 
`checkErrorAndRetryBulk(true)`.
+
+
 More information about Elasticsearch can be found [here](https://elastic.co).
 
 #### Packaging the Elasticsearch Connector into an Uber-Jar

http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 6a2d65f..7977fc0 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -21,12 +21,15 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
@@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
        /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
        private transient BulkProcessorIndexer requestIndexer;
 
+       /**
+        * When set to <code>true</code> and the bulk action fails, the error 
message will be checked for
+        * common patterns like <i>timeout</i>, 
<i>UnavailableShardsException</i> or a full buffer queue on the node.
+        * When a matching pattern is found, the bulk will be retried.
+        */
+       protected boolean checkErrorAndRetryBulk = false;
+
        // 
------------------------------------------------------------------------
        //  Internals for the Flink Elasticsearch Sink
        // 
------------------------------------------------------------------------
@@ -165,20 +175,49 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
                                        if (response.hasFailures()) {
+                                               boolean allRequestsRepeatable = 
true;
+
                                                for (BulkItemResponse itemResp 
: response.getItems()) {
                                                        Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
                                                        if (failure != null) {
-                                                               
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
-                                                               
failureThrowable.compareAndSet(null, failure);
+                                                               String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
+
+                                                               // Check if 
index request can be retried
+                                                               if 
(checkErrorAndRetryBulk && (
+                                                                       
failureMessageLowercase.contains("timeout") ||
+                                                                               
failureMessageLowercase.contains("timed out") || // Generic timeout errors
+                                                                               
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || 
// Shard not available due to rebalancing or node down
+                                                                               
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full
+                                                               {
+                                                                       
LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
+                                                               } else {
+                                                                       // 
Cannot retry action
+                                                                       
allRequestsRepeatable = false;
+                                                                       
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
+                                                                       
failureThrowable.compareAndSet(null, failure);
+                                                               }
                                                        }
                                                }
+
+                                               if (allRequestsRepeatable) {
+                                                       
reAddBulkRequest(request);
+                                               }
                                        }
                                }
 
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-                                       LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
-                                       failureThrowable.compareAndSet(null, 
failure);
+                                       if (checkErrorAndRetryBulk && (
+                                               failure instanceof 
ClusterBlockException // Examples: "no master"
+                                                       || failure instanceof 
ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, 
not seen in stress tests yet
+                                               )
+                                       {
+                                               LOG.debug("Retry bulk on 
throwable: {}", failure.getMessage());
+                                               reAddBulkRequest(request);
+                                       } else {
+                                               LOG.error("Failed Elasticsearch 
bulk request: {}", failure.getMessage(), failure);
+                                               
failureThrowable.compareAndSet(null, failure);
+                                       }
                                }
                        }
                );
@@ -228,6 +267,21 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
                checkErrorAndRethrow();
        }
 
+       /**
+        * Adds all requests of the bulk to the BulkProcessor. Used when trying 
again.
+        * @param bulkRequest
+        */
+       public void reAddBulkRequest(BulkRequest bulkRequest) {
+               //TODO Check what happens when bulk contains a DeleteAction and 
IndexActions and the DeleteAction fails because the document already has been 
deleted. This may not happen in typical Flink jobs.
+
+               for (IndicesRequest req : bulkRequest.subRequests()) {
+                       if (req instanceof ActionRequest) {
+                               // There is no waiting time between index 
requests, so this may produce additional pressure on cluster
+                               bulkProcessor.add((ActionRequest<?>) req);
+                       }
+               }
+       }
+
        private void checkErrorAndRethrow() {
                Throwable cause = failureThrowable.get();
                if (cause != null) {

Reply via email to