[ https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15722847#comment-15722847 ]
ASF GitHub Bot commented on FLINK-5122: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917039 --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java --- @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } } } - hasFailure.set(true); + if (!allRequestsRepeatable) { + hasFailure.set(true); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); + if (failure instanceof ClusterBlockException // Examples: "no master" + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet + ) + { + LOG.debug("Retry batch on throwable: " + failure.getMessage()); --- End diff -- String concat > Elasticsearch Sink loses documents when cluster has high load > ------------------------------------------------------------- > > Key: FLINK-5122 > URL: https://issues.apache.org/jira/browse/FLINK-5122 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.2.0 > Reporter: static-max > Assignee: static-max > > My cluster had high load and documents got not indexed. This violates the "at > least once" semantics in the ES connector. > I gave pressure on my cluster to test Flink, causing new indices to be > created and balanced. On those errors the bulk should be tried again instead > of being discarded. > Primary shard not active because ES decided to rebalance the index: > 2016-11-15 15:35:16,123 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > UnavailableShardsException[[index-name][3] primary shard is not active > Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] > requests]] > Bulk queue on node full (I set queue to a low value to reproduce error): > 22:37:57,702 ERROR > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - > Failed to index document in Elasticsearch: > RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; > nested: EsRejectedExecutionException[rejected execution of > org.elasticsearch.transport.TransportService$4@727e677c on > EsThreadPoolExecutor[bulk, queue capacity = 1, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, > pool size = 2, active threads = 2, queued tasks = 1, completed tasks = > 2939]]]; > I can try to propose a PR for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)