[FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors. Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaac7c2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaac7c2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaac7c2e Branch: refs/heads/master Commit: aaac7c2e505717dbfc40465cb3656652e1bf5658 Parents: 0ba08b4 Author: Max Kuklinski <[email protected]> Authored: Wed Nov 23 17:54:11 2016 +0100 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Feb 24 22:58:40 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/elasticsearch.md | 5 ++ .../elasticsearch/ElasticsearchSinkBase.java | 62 ++++++++++++++++++-- 2 files changed, 63 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/docs/dev/connectors/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 0f2d025..4388b9a 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -272,6 +272,11 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +Optionally, the sink can try to re-execute the bulk request when the error +message matches certain patterns indicating a timeout or a overloaded cluster. +This behaviour is disabled by default and can be enabled by setting `checkErrorAndRetryBulk(true)`. + + More information about Elasticsearch can be found [here](https://elastic.co). #### Packaging the Elasticsearch Connector into an Uber-Jar http://git-wip-us.apache.org/repos/asf/flink/blob/aaac7c2e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java index 6a2d65f..7977fc0 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java @@ -21,12 +21,15 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.InstantiationUtil; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -85,6 +88,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ private transient BulkProcessorIndexer requestIndexer; + /** + * When set to <code>true</code> and the bulk action fails, the error message will be checked for + * common patterns like <i>timeout</i>, <i>UnavailableShardsException</i> or a full buffer queue on the node. + * When a matching pattern is found, the bulk will be retried. + */ + protected boolean checkErrorAndRetryBulk = false; + // ------------------------------------------------------------------------ // Internals for the Flink Elasticsearch Sink // ------------------------------------------------------------------------ @@ -165,20 +175,49 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { + boolean allRequestsRepeatable = true; + for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); if (failure != null) { - LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); - failureThrowable.compareAndSet(null, failure); + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + + // Check if index request can be retried + if (checkErrorAndRetryBulk && ( + failureMessageLowercase.contains("timeout") || + failureMessageLowercase.contains("timed out") || // Generic timeout errors + failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) || // Shard not available due to rebalancing or node down + (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")))) // Bulk index queue on node full + { + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); + } else { + // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); + failureThrowable.compareAndSet(null, failure); + } } } + + if (allRequestsRepeatable) { + reAddBulkRequest(request); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); - failureThrowable.compareAndSet(null, failure); + if (checkErrorAndRetryBulk && ( + failure instanceof ClusterBlockException // Examples: "no master" + || failure instanceof ElasticsearchTimeoutException) // ElasticsearchTimeoutException sounded good, not seen in stress tests yet + ) + { + LOG.debug("Retry bulk on throwable: {}", failure.getMessage()); + reAddBulkRequest(request); + } else { + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); + failureThrowable.compareAndSet(null, failure); + } } } ); @@ -228,6 +267,21 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { checkErrorAndRethrow(); } + /** + * Adds all requests of the bulk to the BulkProcessor. Used when trying again. + * @param bulkRequest + */ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. + + for (IndicesRequest req : bulkRequest.subRequests()) { + if (req instanceof ActionRequest) { + // There is no waiting time between index requests, so this may produce additional pressure on cluster + bulkProcessor.add((ActionRequest<?>) req); + } + } + } + private void checkErrorAndRethrow() { Throwable cause = failureThrowable.get(); if (cause != null) {
