Repository: flink Updated Branches: refs/heads/master 0ba08b444 -> 2437da6e5
[FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink This closes #3358. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2437da6e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2437da6e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2437da6e Branch: refs/heads/master Commit: 2437da6e54cb48c4e29116b8789fbe4782b17ea7 Parents: 3743e89 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Feb 20 16:50:19 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Feb 24 22:58:40 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/elasticsearch.md | 94 ++- .../flink-connector-elasticsearch-base/pom.xml | 8 + .../ActionRequestFailureHandler.java | 29 +- .../elasticsearch/BulkProcessorIndexer.java | 15 +- .../elasticsearch/ElasticsearchSinkBase.java | 236 +++++--- .../util/NoOpActionRequestFailureHandler.java | 37 -- .../elasticsearch/util/NoOpFailureHandler.java | 37 ++ .../RetryRejectedExecutionFailureHandler.java | 46 ++ .../ElasticsearchSinkBaseTest.java | 570 +++++++++++++++++++ .../elasticsearch/ElasticsearchSink.java | 6 +- .../elasticsearch2/ElasticsearchSink.java | 4 +- .../elasticsearch5/ElasticsearchSink.java | 4 +- .../org/apache/flink/util/ExceptionUtils.java | 24 + .../apache/flink/util/InstantiationUtil.java | 10 + 14 files changed, 969 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/docs/dev/connectors/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 2ca1f9b..3fba7f0 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -209,6 +209,41 @@ This will buffer elements before sending them in bulk to the cluster. The `BulkP executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress. +### Elasticsearch Sinks and Fault Tolerance + +With Flinkâs checkpointing enabled, the Flink Elasticsearch Sink guarantees +at-least-once delivery of action requests to Elasticsearch clusters. It does +so by waiting for all pending action requests in the `BulkProcessor` at the +time of checkpoints. This effectively assures that all requests before the +checkpoint was triggered have been successfully acknowledged by Elasticsearch, before +proceeding to process more records sent to the sink. + +More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/internals/stream_checkpointing.html). + +To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} +</div> +</div> + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling +<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be aware +that this essentially means the sink will not provide any strong +delivery guarantees anymore, even with checkpoint for the topology enabled. +</p> + ### Communication using Embedded Node (only for Elasticsearch 1.x) For Elasticsearch versions 1.x, communication using an embedded node is @@ -293,19 +328,20 @@ input.addSink(new ElasticsearchSink<>( new ElasticsearchSinkFunction<String>() {...}, new ActionRequestFailureHandler() { @Override - boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { - // this example uses Apache Commons to search for nested exceptions - - if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + void onFailure(ActionRequest action, + Throwable failure, + int restStatusCode, + RequestIndexer indexer) throw Throwable { + + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action); - return false; - } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink - return false; } else { // for all other failures, fail the sink - return true; + // here the failure is simply rethrown, but users can also choose to throw custom exceptions + throw failure; } } })); @@ -319,19 +355,21 @@ input.addSink(new ElasticsearchSink( config, transportAddresses, new ElasticsearchSinkFunction[String] {...}, new ActionRequestFailureHandler { - override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { - // this example uses Apache Commons to search for nested exceptions + @throws(classOf[Throwable]) + override def onFailure(ActionRequest action, + Throwable failure, + int restStatusCode, + RequestIndexer indexer) { - if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action) - return false - } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink - return false } else { // for all other failures, fail the sink - return true + // here the failure is simply rethrown, but users can also choose to throw custom exceptions + throw failure } } })) @@ -349,7 +387,31 @@ Note that `onFailure` is called for failures that still occur only after the By default, the `BulkProcessor` retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal `BulkProcessor` and how to configure it, please see the following section. - + +By default, if a failure handler is not provided, the sink uses a +`NoOpFailureHandler` that simply fails for all kinds of exceptions. The +connector also provides a `RetryRejectedExecutionFailureHandler` implementation +that always re-add requests that have failed due to queue capacity saturation. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +For example, when using <b>RetryRejectedExecutionFailureHandler</b>, checkpoints +will need to wait until Elasticsearch node queues have enough capacity for +all the pending requests. This also means that if re-added requests never +succeed, the checkpoint will never finish. +</p> + +<p style="border-radius: 5px; padding: 5px" class="bg-warning"> +<b>Failure handling for Elasticsearch 1.x</b>: For Elasticsearch 1.x, it +is not feasible to match the type of the failure because the exact type +could not be retrieved through the older version Java client APIs (thus, +the types will be general <b>Exception</b>s and only differ in the +failure message). In this case, it is recommended to match on the +provided REST status code. +</p> + ### Configuring the Internal Bulk Processor The internal `BulkProcessor` can be further configured for its behaviour http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml b/flink-connectors/flink-connector-elasticsearch-base/pom.xml index 81652c4..32327ff 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml +++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml @@ -68,6 +68,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java index 45d04fc..abbdd72 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java @@ -23,7 +23,7 @@ import java.io.Serializable; /** * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed - * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or + * {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, reprocessing malformed documents, or * simply requesting them to be sent to Elasticsearch again if the failure is only temporary. * * <p> @@ -34,19 +34,16 @@ import java.io.Serializable; * private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler { * * @Override - * boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { - * // this example uses Apache Commons to search for nested exceptions - * - * if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + * void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + * if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { * // full queue; re-add document for indexing * indexer.add(action); - * return false; - * } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + * } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { * // malformed document; simply drop request without failing sink - * return false; * } else { - * // for all other failures, fail the sink - * return true; + * // for all other failures, fail the sink; + * // here the failure is simply rethrown, but users can also choose to throw custom exceptions + * throw failure; * } * } * } @@ -56,6 +53,11 @@ import java.io.Serializable; * <p> * The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests * with malformed documents, without failing the sink. For all other failures, the sink will fail. + * + * <p> + * Note: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type + * could not be retrieved through the older version Java client APIs (thus, the types will be general {@link Exception}s + * and only differ in the failure message). In this case, it is recommended to match on the provided REST status code. */ public interface ActionRequestFailureHandler extends Serializable { @@ -64,9 +66,12 @@ public interface ActionRequestFailureHandler extends Serializable { * * @param action the {@link ActionRequest} that failed due to the failure * @param failure the cause of failure + * @param restStatusCode the REST status code of the failure (-1 if none can be retrieved) * @param indexer request indexer to re-add the failed action, if intended to do so - * @return the implementation should return {@code true} if the sink should fail due to this failure, and {@code false} otherwise + * + * @throws Throwable if the sink should fail on this failure, the implementation should rethrow + * the exception or a custom one */ - boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer); + void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java index d802550..838865a 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java @@ -21,6 +21,10 @@ package org.apache.flink.streaming.connectors.elasticsearch; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. * {@link ActionRequest ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster. @@ -30,14 +34,21 @@ class BulkProcessorIndexer implements RequestIndexer { private static final long serialVersionUID = 6841162943062034253L; private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; - BulkProcessorIndexer(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; + BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) { + this.bulkProcessor = checkNotNull(bulkProcessor); + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef); } @Override public void add(ActionRequest... actionRequests) { for (ActionRequest actionRequest : actionRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } this.bulkProcessor.add(actionRequest); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java index 2c29865..f6944b3 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java @@ -17,8 +17,12 @@ package org.apache.flink.streaming.connectors.elasticsearch; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.util.InstantiationUtil; import org.elasticsearch.action.ActionRequest; @@ -30,11 +34,13 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; @@ -56,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * @param <T> Type of the elements handled by this sink */ -public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { +public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction { private static final long serialVersionUID = -1007596293618451942L; @@ -105,12 +111,12 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { } public void setMaxRetryCount(int maxRetryCount) { - checkArgument(maxRetryCount > 0); + checkArgument(maxRetryCount >= 0); this.maxRetryCount = maxRetryCount; } public void setDelayMillis(long delayMillis) { - checkArgument(delayMillis > 0); + checkArgument(delayMillis >= 0); this.delayMillis = delayMillis; } } @@ -133,6 +139,9 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { /** User-provided handler for failed {@link ActionRequest ActionRequests}. */ private final ActionRequestFailureHandler failureHandler; + /** If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. */ + private boolean flushOnCheckpoint = true; + /** Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest ActionRequests}. */ private transient BulkProcessorIndexer requestIndexer; @@ -143,6 +152,17 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { /** Call bridge for different version-specfic */ private final ElasticsearchApiCallBridge callBridge; + /** + * Number of pending action requests not yet acknowledged by Elasticsearch. + * This value is maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}. + * + * This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler}) requests + * to the {@link RequestIndexer}. It is decremented for each completed request of a bulk request, in + * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)} and + * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)}. + */ + private AtomicLong numPendingRequests = new AtomicLong(0); + /** Elasticsearch client created using the call bridge. */ private transient Client client; @@ -152,7 +172,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { /** * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and * the user considered it should fail the sink via the - * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, RequestIndexer)} method. + * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method. * * Errors will be checked and rethrown before processing each input element, and when the sink is closed. */ @@ -172,21 +192,13 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { // otherwise, if they aren't serializable, users will merely get a non-informative error message // "ElasticsearchSinkBase is not serializable" - try { - InstantiationUtil.serializeObject(elasticsearchSinkFunction); - } catch (Exception e) { - throw new IllegalArgumentException( - "The implementation of the provided ElasticsearchSinkFunction is not serializable. " + - "The object probably contains or references non serializable fields."); - } + checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction), + "The implementation of the provided ElasticsearchSinkFunction is not serializable. " + + "The object probably contains or references non-serializable fields."); - try { - InstantiationUtil.serializeObject(failureHandler); - } catch (Exception e) { - throw new IllegalArgumentException( - "The implementation of the provided ActionRequestFailureHandler is not serializable. " + - "The object probably contains or references non serializable fields."); - } + checkArgument(InstantiationUtil.isSerializable(failureHandler), + "The implementation of the provided ActionRequestFailureHandler is not serializable. " + + "The object probably contains or references non-serializable fields."); // extract and remove bulk processor related configuration from the user-provided config, // so that the resulting user config only contains configuration related to the Elasticsearch client. @@ -244,47 +256,76 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { this.userConfig = userConfig; } + /** + * Disable flushing on checkpoint. When disabled, the sink will not wait for all + * pending action requests to be acknowledged by Elasticsearch on checkpoints. + * + * NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT + * provide any strong guarantees for at-least-once delivery of action requests. + */ + public void disableFlushOnCheckpoint() { + this.flushOnCheckpoint = false; + } + @Override public void open(Configuration parameters) throws Exception { client = callBridge.createClient(userConfig); + bulkProcessor = buildBulkProcessor(new BulkProcessorListener()); + requestIndexer = new BulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests); + } - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( - client, - new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - BulkItemResponse itemResponse; - Throwable failure; - - for (int i = 0; i < response.getItems().length; i++) { - itemResponse = response.getItems()[i]; - failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); - if (failure != null) { - LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); - if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) { - failureThrowable.compareAndSet(null, failure); - } - } - } - } - } + @Override + public void invoke(T value) throws Exception { + // if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink + checkErrorAndRethrow(); - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer); + } - // whole bulk request failures are usually just temporary timeouts on - // the Elasticsearch side; simply retry all action requests in the bulk - for (ActionRequest action : request.requests()) { - requestIndexer.add(action); - } - } - } - ); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); + checkErrorAndRethrow(); + } while (numPendingRequests.get() != 0); + } + } + + @Override + public void close() throws Exception { + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + client = null; + } + + callBridge.cleanup(); + + // make sure any errors from callbacks are rethrown + checkErrorAndRethrow(); + } + + /** + * Build the {@link BulkProcessor}. + * + * Note: this is exposed for testing purposes. + */ + protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { + checkNotNull(listener); + + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); @@ -304,40 +345,81 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> { // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy); - bulkProcessor = bulkProcessorBuilder.build(); - requestIndexer = new BulkProcessorIndexer(bulkProcessor); + return bulkProcessorBuilder.build(); } - @Override - public void invoke(T value) throws Exception { - // if bulk processor callbacks have previously reported an error, we rethrow the error and fail the sink - checkErrorAndRethrow(); - - elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer); + private void checkErrorAndRethrow() { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occurred in ElasticsearchSink.", cause); + } } - @Override - public void close() throws Exception { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } + private class BulkProcessorListener implements BulkProcessor.Listener { + @Override + public void beforeBulk(long executionId, BulkRequest request) { } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + BulkItemResponse itemResponse; + Throwable failure; + RestStatus restStatus; + + try { + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems()[i]; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); + if (failure != null) { + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + restStatus = itemResponse.getFailure().getStatus(); + if (restStatus == null) { + failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer); + } else { + failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer); + } + } + } + } catch (Throwable t) { + // fail the sink and skip the rest of the items + // if the failure handler decides to throw an exception + failureThrowable.compareAndSet(null, t); + } + } - if (client != null) { - client.close(); - client = null; + if (flushOnCheckpoint) { + numPendingRequests.getAndAdd(-request.numberOfActions()); + } } - callBridge.cleanup(); + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); - // make sure any errors from callbacks are rethrown - checkErrorAndRethrow(); + try { + for (ActionRequest action : request.requests()) { + failureHandler.onFailure(action, failure, -1, requestIndexer); + } + } catch (Throwable t) { + // fail the sink and skip the rest of the items + // if the failure handler decides to throw an exception + failureThrowable.compareAndSet(null, t); + } + + if (flushOnCheckpoint) { + numPendingRequests.getAndAdd(-request.numberOfActions()); + } + } } - private void checkErrorAndRethrow() { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); + @VisibleForTesting + long getNumPendingRequests() { + if (flushOnCheckpoint) { + return numPendingRequests.get(); + } else { + throw new UnsupportedOperationException( + "The number of pending requests is not maintained when flushing on checkpoint is disabled."); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java deleted file mode 100644 index 09173a2..0000000 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.elasticsearch.util; - -import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.elasticsearch.action.ActionRequest; - -/** - * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures. - */ -public class NoOpActionRequestFailureHandler implements ActionRequestFailureHandler { - - private static final long serialVersionUID = 737941343410827885L; - - @Override - public boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { - // simply fail the sink - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java new file mode 100644 index 0000000..b19ea08 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.elasticsearch.action.ActionRequest; + +/** + * An {@link ActionRequestFailureHandler} that simply fails the sink on any failures. + */ +public class NoOpFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = 737941343410827885L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + // simply fail the sink + throw failure; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java new file mode 100644 index 0000000..fabdcbc --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.util.ExceptionUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +/** + * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary + * {@link EsRejectedExecutionException}s (which means that Elasticsearch node queues are currently full), + * and fails for all other failures. + */ +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = -7423562912824511906L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { + indexer.add(action); + } else { + // rethrow all other failures + throw failure; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java new file mode 100644 index 0000000..b9df5c6 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Suite of tests for {@link ElasticsearchSinkBase}. + */ +public class ElasticsearchSinkBaseTest { + + /** Tests that any item failure in the listener callbacks is rethrown on an immediately following invoke call. */ + @Test + public void testItemFailureRethrownOnInvoke() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler()); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and its mock item failures + sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); + testHarness.processElement(new StreamRecord<>("msg")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // manually execute the next bulk request + sink.manualBulkRequestWithAllPendingRequests(); + + try { + testHarness.processElement(new StreamRecord<>("next msg")); + } catch (Exception e) { + // the invoke should have failed with the failure + Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for record")); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** Tests that any item failure in the listener callbacks is rethrown on an immediately following checkpoint. */ + @Test + public void testItemFailureRethrownOnCheckpoint() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler()); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and its mock item failures + sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); + testHarness.processElement(new StreamRecord<>("msg")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // manually execute the next bulk request + sink.manualBulkRequestWithAllPendingRequests(); + + try { + testHarness.snapshot(1L, 1000L); + } catch (Exception e) { + // the snapshot should have failed with the failure + Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record")); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Tests that any item failure in the listener callbacks due to flushing on an immediately following checkpoint + * is rethrown; we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=5000) + public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler()); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and its mock item failures + + List<Exception> mockResponsesList = new ArrayList<>(2); + mockResponsesList.add(null); // the first request in a bulk will succeed + mockResponsesList.add(new Exception("artificial failure for record")); // the second request in a bulk will fail + sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList); + + testHarness.processElement(new StreamRecord<>("msg-1")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // manually execute the next bulk request (1 request only, thus should succeed) + sink.manualBulkRequestWithAllPendingRequests(); + + // setup the requests to be flushed in the snapshot + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class)); + + CheckedThread snapshotThread = new CheckedThread() { + @Override + public void go() throws Exception { + testHarness.snapshot(1L, 1000L); + } + }; + snapshotThread.start(); + + // the snapshot should eventually be blocked before snapshot triggers flushing + while (snapshotThread.getState() != Thread.State.WAITING) { + Thread.sleep(10); + } + + // let the snapshot-triggered flush continue (2 records in the bulk, so the 2nd one should fail) + sink.continueFlush(); + + try { + snapshotThread.sync(); + } catch (Exception e) { + // the snapshot should have failed with the failure from the 2nd request + Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for record")); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** Tests that any bulk failure in the listener callbacks is rethrown on an immediately following invoke call. */ + @Test + public void testBulkFailureRethrownOnInvoke() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler()); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and let the whole bulk request fail + sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request")); + testHarness.processElement(new StreamRecord<>("msg")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // manually execute the next bulk request + sink.manualBulkRequestWithAllPendingRequests(); + + try { + testHarness.processElement(new StreamRecord<>("next msg")); + } catch (Exception e) { + // the invoke should have failed with the bulk request failure + Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for bulk request")); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** Tests that any bulk failure in the listener callbacks is rethrown on an immediately following checkpoint. */ + @Test + public void testBulkFailureRethrownOnCheckpoint() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler()); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and let the whole bulk request fail + sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request")); + testHarness.processElement(new StreamRecord<>("msg")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // manually execute the next bulk request + sink.manualBulkRequestWithAllPendingRequests(); + + try { + testHarness.snapshot(1L, 1000L); + } catch (Exception e) { + // the snapshot should have failed with the bulk request failure + Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request")); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Tests that any bulk failure in the listener callbacks due to flushing on an immediately following checkpoint + * is rethrown; we set a timeout because the test will not finish if the logic is broken. + */ + @Test(timeout=5000) + public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new NoOpFailureHandler()); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and let bulk request succeed + sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null)); + testHarness.processElement(new StreamRecord<>("msg-1")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // manually execute the next bulk request + sink.manualBulkRequestWithAllPendingRequests(); + + // setup the requests to be flushed in the snapshot + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class)); + + CheckedThread snapshotThread = new CheckedThread() { + @Override + public void go() throws Exception { + testHarness.snapshot(1L, 1000L); + } + }; + snapshotThread.start(); + + // the snapshot should eventually be blocked before snapshot triggers flushing + while (snapshotThread.getState() != Thread.State.WAITING) { + Thread.sleep(10); + } + + // for the snapshot-triggered flush, we let the bulk request fail completely + sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request")); + + // let the snapshot-triggered flush continue (bulk request should fail completely) + sink.continueFlush(); + + try { + snapshotThread.sync(); + } catch (Exception e) { + // the snapshot should have failed with the bulk request failure + Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial failure for bulk request")); + + // test succeeded + return; + } + + Assert.fail(); + } + + /** + * Tests that the sink correctly waits for pending requests (including re-added requests) on checkpoints; + * we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=5000) + public void testAtLeastOnceSink() throws Throwable { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), + new SimpleSinkFunction<String>(), + new DummyRetryFailureHandler()); // use a failure handler that simply re-adds requests + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and its mock item failures; + // it contains 1 request, which will fail and re-added to the next bulk request + sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); + testHarness.processElement(new StreamRecord<>("msg")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + CheckedThread snapshotThread = new CheckedThread() { + @Override + public void go() throws Exception { + testHarness.snapshot(1L, 1000L); + } + }; + snapshotThread.start(); + + // the snapshot should eventually be blocked before snapshot triggers flushing + while (snapshotThread.getState() != Thread.State.WAITING) { + Thread.sleep(10); + } + + sink.continueFlush(); + + // since the previous flush should have resulted in a request re-add from the failure handler, + // we should have flushed again, and eventually be blocked before snapshot triggers the 2nd flush + while (snapshotThread.getState() != Thread.State.WAITING) { + Thread.sleep(10); + } + + // current number of pending request should be 1 due to the re-add + Assert.assertEquals(1, sink.getNumPendingRequests()); + + // this time, let the bulk request succeed, so no-more requests are re-added + sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null)); + + sink.continueFlush(); + + // the snapshot should finish with no exceptions + snapshotThread.sync(); + + testHarness.close(); + } + + /** + * This test is meant to assure that testAtLeastOnceSink is valid by testing that if flushing is disabled, + * the snapshot method does indeed finishes without waiting for pending requests; + * we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=5000) + public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception { + final DummyElasticsearchSink<String> sink = new DummyElasticsearchSink<>( + new HashMap<String, String>(), new SimpleSinkFunction<String>(), new DummyRetryFailureHandler()); + sink.disableFlushOnCheckpoint(); // disable flushing + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.open(); + + // setup the next bulk request, and let bulk request succeed + sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); + testHarness.processElement(new StreamRecord<>("msg-1")); + verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + + // the snapshot should not block even though we haven't flushed the bulk request + testHarness.snapshot(1L, 1000L); + + testHarness.close(); + } + + private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> { + + private static final long serialVersionUID = 5051907841570096991L; + + private transient BulkProcessor mockBulkProcessor; + private transient BulkRequest nextBulkRequest = new BulkRequest(); + private transient MultiShotLatch flushLatch = new MultiShotLatch(); + + private List<? extends Throwable> mockItemFailuresList; + private Throwable nextBulkFailure; + + public DummyElasticsearchSink( + Map<String, String> userConfig, + ElasticsearchSinkFunction<T> sinkFunction, + ActionRequestFailureHandler failureHandler) { + super(new DummyElasticsearchApiCallBridge(), userConfig, sinkFunction, failureHandler); + } + + /** + * This method is used to mimic a scheduled bulk request; we need to do this + * manually because we are mocking the BulkProcessor + */ + public void manualBulkRequestWithAllPendingRequests() { + flushLatch.trigger(); // let the flush + mockBulkProcessor.flush(); + } + + /** + * On non-manual flushes, i.e. when flush is called in the snapshot method implementation, + * usages need to explicitly call this to allow the flush to continue. This is useful + * to make sure that specific requests get added to the the next bulk request for flushing. + */ + public void continueFlush() { + flushLatch.trigger(); + } + + /** + * Set the list of mock failures to use for the next bulk of item responses. A {@code null} + * means that the response is successful, failed otherwise. + * + * The list is used with corresponding order to the requests in the bulk, i.e. the first + * request uses the response at index 0, the second requests uses the response at index 1, etc. + */ + public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) { + this.mockItemFailuresList = mockItemFailuresList; + } + + /** + * Let the next bulk request fail completely with the provided throwable. + * If this is set, the failures list provided with setMockItemFailuresListForNextBulkItemResponses is not respected. + */ + public void setFailNextBulkRequestCompletely(Throwable failure) { + this.nextBulkFailure = failure; + } + + public BulkProcessor getMockBulkProcessor() { + return mockBulkProcessor; + } + + /** + * Override the bulk processor build process to provide a mock implementation, + * but reuse the listener implementation in our mock to test that the listener logic + * works correctly with request flushing logic. + */ + @Override + protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) { + this.mockBulkProcessor = mock(BulkProcessor.class); + + when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + // intercept the request and add it to our mock bulk request + nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class)); + + return null; + } + }); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + while (nextBulkRequest.numberOfActions() > 0) { + // wait until we are allowed to continue with the flushing + flushLatch.await(); + + // create a copy of the accumulated mock requests, so that + // re-added requests from the failure handler are included in the next bulk + BulkRequest currentBulkRequest = nextBulkRequest; + nextBulkRequest = new BulkRequest(); + + listener.beforeBulk(123L, currentBulkRequest); + + if (nextBulkFailure == null) { + BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()]; + for (int i = 0; i < currentBulkRequest.requests().size(); i++) { + Throwable mockItemFailure = mockItemFailuresList.get(i); + + if (mockItemFailure == null) { + // the mock response for the item is success + mockResponses[i] = new BulkItemResponse(i, "opType", mock(ActionResponse.class)); + } else { + // the mock response for the item is failure + mockResponses[i] = new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure)); + } + } + + listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 1000L)); + } else { + listener.afterBulk(123L, currentBulkRequest, nextBulkFailure); + } + } + + return null; + } + }).when(mockBulkProcessor).flush(); + + return mockBulkProcessor; + } + } + + private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge { + + private static final long serialVersionUID = -4272760730959041699L; + + @Override + public Client createClient(Map<String, String> clientConfig) { + return mock(Client.class); + } + + @Nullable + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (bulkItemResponse.isFailed()) { + return new Exception(bulkItemResponse.getFailure().getMessage()); + } else { + return null; + } + } + + @Override + public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + // no need for this in the test cases here + } + + @Override + public void cleanup() { + // nothing to cleanup + } + } + + private static class SimpleSinkFunction<String> implements ElasticsearchSinkFunction<String> { + + private static final long serialVersionUID = -176739293659135148L; + + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + Map<java.lang.String, Object> json = new HashMap<>(); + json.put("data", element); + + indexer.add( + Requests.indexRequest() + .index("index") + .type("type") + .id("id") + .source(json) + ); + } + } + + private static class DummyRetryFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = 5400023700099200745L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + indexer.add(action); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java index 375d739..2298986 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.connectors.elasticsearch; -import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; @@ -106,7 +106,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this(userConfig, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + this(userConfig, elasticsearchSinkFunction, new NoOpFailureHandler()); } /** @@ -117,7 +117,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element */ public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java index 2210f63..6d771d4 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.elasticsearch2; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; -import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.transport.TransportClient; @@ -89,7 +89,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { List<InetSocketAddress> transportAddresses, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java index 175b4fa..61023c2 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch5; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.transport.TransportClient; @@ -75,7 +75,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpActionRequestFailureHandler()); + this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 69c2692..fea25ff 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -261,6 +261,30 @@ public final class ExceptionUtils { } } + /** + * Checks whether a throwable chain contains a specific type of exception. + * + * @param throwable the throwable chain to check. + * @param searchType the type of exception to search for in the chain. + * @return True, if the searched type is nested in the throwable, false otherwise. + */ + public static boolean containsThrowable(Throwable throwable, Class searchType) { + if (throwable == null || searchType == null) { + return false; + } + + Throwable t = throwable; + while (t != null) { + if (searchType.isAssignableFrom(t.getClass())) { + return true; + } else { + t = t.getCause(); + } + } + + return false; + } + // ------------------------------------------------------------------------ /** Private constructor to prevent instantiation. */ http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 219bf2a..d4a031c 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -325,6 +325,16 @@ public final class InstantiationUtil { oos.writeObject(o); } + public static boolean isSerializable(Object o) { + try { + serializeObject(o); + } catch (IOException e) { + return false; + } + + return true; + } + /** * Clones the given serializable object using Java serialization. *
