Repository: flink
Updated Branches:
  refs/heads/master 0ba08b444 -> 2437da6e5


[FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink

This closes #3358.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2437da6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2437da6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2437da6e

Branch: refs/heads/master
Commit: 2437da6e54cb48c4e29116b8789fbe4782b17ea7
Parents: 3743e89
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Feb 20 16:50:19 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Feb 24 22:58:40 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/elasticsearch.md            |  94 ++-
 .../flink-connector-elasticsearch-base/pom.xml  |   8 +
 .../ActionRequestFailureHandler.java            |  29 +-
 .../elasticsearch/BulkProcessorIndexer.java     |  15 +-
 .../elasticsearch/ElasticsearchSinkBase.java    | 236 +++++---
 .../util/NoOpActionRequestFailureHandler.java   |  37 --
 .../elasticsearch/util/NoOpFailureHandler.java  |  37 ++
 .../RetryRejectedExecutionFailureHandler.java   |  46 ++
 .../ElasticsearchSinkBaseTest.java              | 570 +++++++++++++++++++
 .../elasticsearch/ElasticsearchSink.java        |   6 +-
 .../elasticsearch2/ElasticsearchSink.java       |   4 +-
 .../elasticsearch5/ElasticsearchSink.java       |   4 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  24 +
 .../apache/flink/util/InstantiationUtil.java    |  10 +
 14 files changed, 969 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/docs/dev/connectors/elasticsearch.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/elasticsearch.md 
b/docs/dev/connectors/elasticsearch.md
index 2ca1f9b..3fba7f0 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -209,6 +209,41 @@ This will buffer elements before sending them in bulk to 
the cluster. The `BulkP
 executes bulk requests one at a time, i.e. there will be no two concurrent
 flushes of the buffered actions in progress.
 
+### Elasticsearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
+at-least-once delivery of action requests to Elasticsearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Elasticsearch, 
before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance 
docs]({{site.baseurl}}/internals/stream_checkpointing.html).
+
+To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs 
to be enabled at the execution environment:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+</div>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>NOTE</b>: Users can disable flushing if they wish to do so, by calling
+<b>disableFlushOnCheckpoint()</b> on the created <b>ElasticsearchSink</b>. Be 
aware
+that this essentially means the sink will not provide any strong
+delivery guarantees anymore, even with checkpoint for the topology enabled.
+</p>
+
 ### Communication using Embedded Node (only for Elasticsearch 1.x)
 
 For Elasticsearch versions 1.x, communication using an embedded node is
@@ -293,19 +328,20 @@ input.addSink(new ElasticsearchSink<>(
     new ElasticsearchSinkFunction<String>() {...},
     new ActionRequestFailureHandler() {
         @Override
-        boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
-            // this example uses Apache Commons to search for nested exceptions
-            
-            if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+        void onFailure(ActionRequest action,
+                Throwable failure,
+                int restStatusCode,
+                RequestIndexer indexer) throw Throwable {
+
+            if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
                 // full queue; re-add document for indexing
                 indexer.add(action);
-                return false;
-            } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
+            } else if (ExceptionUtils.containsThrowable(failure, 
ElasticsearchParseException.class)) {
                 // malformed document; simply drop request without failing sink
-                return false;
             } else {
                 // for all other failures, fail the sink
-                return true;
+                // here the failure is simply rethrown, but users can also 
choose to throw custom exceptions
+                throw failure;
             }
         }
 }));
@@ -319,19 +355,21 @@ input.addSink(new ElasticsearchSink(
     config, transportAddresses,
     new ElasticsearchSinkFunction[String] {...},
     new ActionRequestFailureHandler {
-        override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
-            // this example uses Apache Commons to search for nested exceptions
+        @throws(classOf[Throwable])
+        override def onFailure(ActionRequest action,
+                Throwable failure,
+                int restStatusCode,
+                RequestIndexer indexer) {
 
-            if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+            if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
                 // full queue; re-add document for indexing
                 indexer.add(action)
-                return false
-            } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+            } else if (ExceptionUtils.containsThrowable(failure, 
ElasticsearchParseException.class)) {
                 // malformed document; simply drop request without failing sink
-                return false
             } else {
                 // for all other failures, fail the sink
-                return true
+                // here the failure is simply rethrown, but users can also 
choose to throw custom exceptions
+                throw failure
             }
         }
 }))
@@ -349,7 +387,31 @@ Note that `onFailure` is called for failures that still 
occur only after the
 By default, the `BulkProcessor` retries to a maximum of 8 attempts with
 an exponential backoff. For more information on the behaviour of the
 internal `BulkProcessor` and how to configure it, please see the following 
section.
- 
+
+By default, if a failure handler is not provided, the sink uses a
+`NoOpFailureHandler` that simply fails for all kinds of exceptions. The
+connector also provides a `RetryRejectedExecutionFailureHandler` implementation
+that always re-add requests that have failed due to queue capacity saturation.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>RetryRejectedExecutionFailureHandler</b>, 
checkpoints
+will need to wait until Elasticsearch node queues have enough capacity for
+all the pending requests. This also means that if re-added requests never
+succeed, the checkpoint will never finish.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-warning">
+<b>Failure handling for Elasticsearch 1.x</b>: For Elasticsearch 1.x, it
+is not feasible to match the type of the failure because the exact type
+could not be retrieved through the older version Java client APIs (thus,
+the types will be general <b>Exception</b>s and only differ in the
+failure message). In this case, it is recommended to match on the
+provided REST status code.
+</p>
+
 ### Configuring the Internal Bulk Processor
 
 The internal `BulkProcessor` can be further configured for its behaviour

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/pom.xml 
b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
index 81652c4..32327ff 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch-base/pom.xml
@@ -68,6 +68,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.10</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
index 45d04fc..abbdd72 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 
 /**
  * An implementation of {@link ActionRequestFailureHandler} is provided by the 
user to define how failed
- * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, 
reprocessing malformed documents, or
+ * {@link ActionRequest ActionRequests} should be handled, e.g. dropping them, 
reprocessing malformed documents, or
  * simply requesting them to be sent to Elasticsearch again if the failure is 
only temporary.
  *
  * <p>
@@ -34,19 +34,16 @@ import java.io.Serializable;
  *     private static class ExampleActionRequestFailureHandler implements 
ActionRequestFailureHandler {
  *
  *             @Override
- *             boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
- *                     // this example uses Apache Commons to search for 
nested exceptions
- *
- *                     if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
+ *             void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+ *                     if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
  *                             // full queue; re-add document for indexing
  *                             indexer.add(action);
- *                             return false;
- *                     } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
+ *                     } else if (ExceptionUtils.containsThrowable(failure, 
ElasticsearchParseException.class)) {
  *                             // malformed document; simply drop request 
without failing sink
- *                             return false;
  *                     } else {
- *                             // for all other failures, fail the sink
- *                             return true;
+ *                             // for all other failures, fail the sink;
+ *                             // here the failure is simply rethrown, but 
users can also choose to throw custom exceptions
+ *                             throw failure;
  *                     }
  *             }
  *     }
@@ -56,6 +53,11 @@ import java.io.Serializable;
  * <p>
  * The above example will let the sink re-add requests that failed due to 
queue capacity saturation and drop requests
  * with malformed documents, without failing the sink. For all other failures, 
the sink will fail.
+ *
+ * <p>
+ * Note: For Elasticsearch 1.x, it is not feasible to match the type of the 
failure because the exact type
+ * could not be retrieved through the older version Java client APIs (thus, 
the types will be general {@link Exception}s
+ * and only differ in the failure message). In this case, it is recommended to 
match on the provided REST status code.
  */
 public interface ActionRequestFailureHandler extends Serializable {
 
@@ -64,9 +66,12 @@ public interface ActionRequestFailureHandler extends 
Serializable {
         *
         * @param action the {@link ActionRequest} that failed due to the 
failure
         * @param failure the cause of failure
+        * @param restStatusCode the REST status code of the failure (-1 if 
none can be retrieved)
         * @param indexer request indexer to re-add the failed action, if 
intended to do so
-        * @return the implementation should return {@code true} if the sink 
should fail due to this failure, and {@code false} otherwise
+        *
+        * @throws Throwable if the sink should fail on this failure, the 
implementation should rethrow
+        *                   the exception or a custom one
         */
-       boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer);
+       void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index d802550..838865a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -21,6 +21,10 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
  * {@link ActionRequest ActionRequests} will be buffered before sending a bulk 
request to the Elasticsearch cluster.
@@ -30,14 +34,21 @@ class BulkProcessorIndexer implements RequestIndexer {
        private static final long serialVersionUID = 6841162943062034253L;
 
        private final BulkProcessor bulkProcessor;
+       private final boolean flushOnCheckpoint;
+       private final AtomicLong numPendingRequestsRef;
 
-       BulkProcessorIndexer(BulkProcessor bulkProcessor) {
-               this.bulkProcessor = bulkProcessor;
+       BulkProcessorIndexer(BulkProcessor bulkProcessor, boolean 
flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+               this.bulkProcessor = checkNotNull(bulkProcessor);
+               this.flushOnCheckpoint = flushOnCheckpoint;
+               this.numPendingRequestsRef = 
checkNotNull(numPendingRequestsRef);
        }
 
        @Override
        public void add(ActionRequest... actionRequests) {
                for (ActionRequest actionRequest : actionRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
                        this.bulkProcessor.add(actionRequest);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2c29865..f6944b3 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.InstantiationUtil;
 import org.elasticsearch.action.ActionRequest;
@@ -30,11 +34,13 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -56,7 +62,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <T> Type of the elements handled by this sink
  */
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> {
+public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> 
implements CheckpointedFunction {
 
        private static final long serialVersionUID = -1007596293618451942L;
 
@@ -105,12 +111,12 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
                }
 
                public void setMaxRetryCount(int maxRetryCount) {
-                       checkArgument(maxRetryCount > 0);
+                       checkArgument(maxRetryCount >= 0);
                        this.maxRetryCount = maxRetryCount;
                }
 
                public void setDelayMillis(long delayMillis) {
-                       checkArgument(delayMillis > 0);
+                       checkArgument(delayMillis >= 0);
                        this.delayMillis = delayMillis;
                }
        }
@@ -133,6 +139,9 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
        /** User-provided handler for failed {@link ActionRequest 
ActionRequests}. */
        private final ActionRequestFailureHandler failureHandler;
 
+       /** If true, the producer will wait until all outstanding action 
requests have been sent to Elasticsearch. */
+       private boolean flushOnCheckpoint = true;
+
        /** Provided to the user via the {@link ElasticsearchSinkFunction} to 
add {@link ActionRequest ActionRequests}. */
        private transient BulkProcessorIndexer requestIndexer;
 
@@ -143,6 +152,17 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
        /** Call bridge for different version-specfic */
        private final ElasticsearchApiCallBridge callBridge;
 
+       /**
+        * Number of pending action requests not yet acknowledged by 
Elasticsearch.
+        * This value is maintained only if {@link 
ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+        *
+        * This is incremented whenever the user adds (or re-adds through the 
{@link ActionRequestFailureHandler}) requests
+        * to the {@link RequestIndexer}. It is decremented for each completed 
request of a bulk request, in
+        * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, 
BulkResponse)} and
+        * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, 
Throwable)}.
+        */
+       private AtomicLong numPendingRequests = new AtomicLong(0);
+
        /** Elasticsearch client created using the call bridge. */
        private transient Client client;
 
@@ -152,7 +172,7 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
        /**
         * This is set from inside the {@link BulkProcessor.Listener} if a 
{@link Throwable} was thrown in callbacks and
         * the user considered it should fail the sink via the
-        * {@link ActionRequestFailureHandler#onFailure(ActionRequest, 
Throwable, RequestIndexer)} method.
+        * {@link ActionRequestFailureHandler#onFailure(ActionRequest, 
Throwable, int, RequestIndexer)} method.
         *
         * Errors will be checked and rethrown before processing each input 
element, and when the sink is closed.
         */
@@ -172,21 +192,13 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
                // otherwise, if they aren't serializable, users will merely 
get a non-informative error message
                // "ElasticsearchSinkBase is not serializable"
 
-               try {
-                       
InstantiationUtil.serializeObject(elasticsearchSinkFunction);
-               } catch (Exception e) {
-                       throw new IllegalArgumentException(
-                               "The implementation of the provided 
ElasticsearchSinkFunction is not serializable. " +
-                               "The object probably contains or references non 
serializable fields.");
-               }
+               
checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+                       "The implementation of the provided 
ElasticsearchSinkFunction is not serializable. " +
+                               "The object probably contains or references 
non-serializable fields.");
 
-               try {
-                       InstantiationUtil.serializeObject(failureHandler);
-               } catch (Exception e) {
-                       throw new IllegalArgumentException(
-                               "The implementation of the provided 
ActionRequestFailureHandler is not serializable. " +
-                                       "The object probably contains or 
references non serializable fields.");
-               }
+               checkArgument(InstantiationUtil.isSerializable(failureHandler),
+                       "The implementation of the provided 
ActionRequestFailureHandler is not serializable. " +
+                               "The object probably contains or references 
non-serializable fields.");
 
                // extract and remove bulk processor related configuration from 
the user-provided config,
                // so that the resulting user config only contains 
configuration related to the Elasticsearch client.
@@ -244,47 +256,76 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
                this.userConfig = userConfig;
        }
 
+       /**
+        * Disable flushing on checkpoint. When disabled, the sink will not 
wait for all
+        * pending action requests to be acknowledged by Elasticsearch on 
checkpoints.
+        *
+        * NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch 
Sink does NOT
+        * provide any strong guarantees for at-least-once delivery of action 
requests.
+        */
+       public void disableFlushOnCheckpoint() {
+               this.flushOnCheckpoint = false;
+       }
+
        @Override
        public void open(Configuration parameters) throws Exception {
                client = callBridge.createClient(userConfig);
+               bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
+               requestIndexer = new BulkProcessorIndexer(bulkProcessor, 
flushOnCheckpoint, numPendingRequests);
+       }
 
-               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(
-                       client,
-                       new BulkProcessor.Listener() {
-                               @Override
-                               public void beforeBulk(long executionId, 
BulkRequest request) { }
-
-                               @Override
-                               public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
-                                       if (response.hasFailures()) {
-                                               BulkItemResponse itemResponse;
-                                               Throwable failure;
-
-                                               for (int i = 0; i < 
response.getItems().length; i++) {
-                                                       itemResponse = 
response.getItems()[i];
-                                                       failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
-                                                       if (failure != null) {
-                                                               
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
-                                                               if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
-                                                                       
failureThrowable.compareAndSet(null, failure);
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
+       @Override
+       public void invoke(T value) throws Exception {
+               // if bulk processor callbacks have previously reported an 
error, we rethrow the error and fail the sink
+               checkErrorAndRethrow();
 
-                               @Override
-                               public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
-                                       LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
+               elasticsearchSinkFunction.process(value, getRuntimeContext(), 
requestIndexer);
+       }
 
-                                       // whole bulk request failures are 
usually just temporary timeouts on
-                                       // the Elasticsearch side; simply retry 
all action requests in the bulk
-                                       for (ActionRequest action : 
request.requests()) {
-                                               requestIndexer.add(action);
-                                       }
-                               }
-                       }
-               );
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               // no initialization needed
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               checkErrorAndRethrow();
+
+               if (flushOnCheckpoint) {
+                       do {
+                               bulkProcessor.flush();
+                               checkErrorAndRethrow();
+                       } while (numPendingRequests.get() != 0);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (bulkProcessor != null) {
+                       bulkProcessor.close();
+                       bulkProcessor = null;
+               }
+
+               if (client != null) {
+                       client.close();
+                       client = null;
+               }
+
+               callBridge.cleanup();
+
+               // make sure any errors from callbacks are rethrown
+               checkErrorAndRethrow();
+       }
+
+       /**
+        * Build the {@link BulkProcessor}.
+        *
+        * Note: this is exposed for testing purposes.
+        */
+       protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener 
listener) {
+               checkNotNull(listener);
+
+               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(client, listener);
 
                // This makes flush() blocking
                bulkProcessorBuilder.setConcurrentRequests(0);
@@ -304,40 +345,81 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> {
                // if backoff retrying is disabled, 
bulkProcessorFlushBackoffPolicy will be null
                callBridge.configureBulkProcessorBackoff(bulkProcessorBuilder, 
bulkProcessorFlushBackoffPolicy);
 
-               bulkProcessor = bulkProcessorBuilder.build();
-               requestIndexer = new BulkProcessorIndexer(bulkProcessor);
+               return bulkProcessorBuilder.build();
        }
 
-       @Override
-       public void invoke(T value) throws Exception {
-               // if bulk processor callbacks have previously reported an 
error, we rethrow the error and fail the sink
-               checkErrorAndRethrow();
-
-               elasticsearchSinkFunction.process(value, getRuntimeContext(), 
requestIndexer);
+       private void checkErrorAndRethrow() {
+               Throwable cause = failureThrowable.get();
+               if (cause != null) {
+                       throw new RuntimeException("An error occurred in 
ElasticsearchSink.", cause);
+               }
        }
 
-       @Override
-       public void close() throws Exception {
-               if (bulkProcessor != null) {
-                       bulkProcessor.close();
-                       bulkProcessor = null;
-               }
+       private class BulkProcessorListener implements BulkProcessor.Listener {
+               @Override
+               public void beforeBulk(long executionId, BulkRequest request) { 
}
+
+               @Override
+               public void afterBulk(long executionId, BulkRequest request, 
BulkResponse response) {
+                       if (response.hasFailures()) {
+                               BulkItemResponse itemResponse;
+                               Throwable failure;
+                               RestStatus restStatus;
+
+                               try {
+                                       for (int i = 0; i < 
response.getItems().length; i++) {
+                                               itemResponse = 
response.getItems()[i];
+                                               failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+                                               if (failure != null) {
+                                                       LOG.error("Failed 
Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
+
+                                                       restStatus = 
itemResponse.getFailure().getStatus();
+                                                       if (restStatus == null) 
{
+                                                               
failureHandler.onFailure(request.requests().get(i), failure, -1, 
requestIndexer);
+                                                       } else {
+                                                               
failureHandler.onFailure(request.requests().get(i), failure, 
restStatus.getStatus(), requestIndexer);
+                                                       }
+                                               }
+                                       }
+                               } catch (Throwable t) {
+                                       // fail the sink and skip the rest of 
the items
+                                       // if the failure handler decides to 
throw an exception
+                                       failureThrowable.compareAndSet(null, t);
+                               }
+                       }
 
-               if (client != null) {
-                       client.close();
-                       client = null;
+                       if (flushOnCheckpoint) {
+                               
numPendingRequests.getAndAdd(-request.numberOfActions());
+                       }
                }
 
-               callBridge.cleanup();
+               @Override
+               public void afterBulk(long executionId, BulkRequest request, 
Throwable failure) {
+                       LOG.error("Failed Elasticsearch bulk request: {}", 
failure.getMessage(), failure.getCause());
 
-               // make sure any errors from callbacks are rethrown
-               checkErrorAndRethrow();
+                       try {
+                               for (ActionRequest action : request.requests()) 
{
+                                       failureHandler.onFailure(action, 
failure, -1, requestIndexer);
+                               }
+                       } catch (Throwable t) {
+                               // fail the sink and skip the rest of the items
+                               // if the failure handler decides to throw an 
exception
+                               failureThrowable.compareAndSet(null, t);
+                       }
+
+                       if (flushOnCheckpoint) {
+                               
numPendingRequests.getAndAdd(-request.numberOfActions());
+                       }
+               }
        }
 
-       private void checkErrorAndRethrow() {
-               Throwable cause = failureThrowable.get();
-               if (cause != null) {
-                       throw new RuntimeException("An error occured in 
ElasticsearchSink.", cause);
+       @VisibleForTesting
+       long getNumPendingRequests() {
+               if (flushOnCheckpoint) {
+                       return numPendingRequests.get();
+               } else {
+                       throw new UnsupportedOperationException(
+                               "The number of pending requests is not 
maintained when flushing on checkpoint is disabled.");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
deleted file mode 100644
index 09173a2..0000000
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpActionRequestFailureHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.util;
-
-import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
-import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.elasticsearch.action.ActionRequest;
-
-/**
- * An {@link ActionRequestFailureHandler} that simply fails the sink on any 
failures.
- */
-public class NoOpActionRequestFailureHandler implements 
ActionRequestFailureHandler {
-
-       private static final long serialVersionUID = 737941343410827885L;
-
-       @Override
-       public boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
-               // simply fail the sink
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
new file mode 100644
index 0000000..b19ea08
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/NoOpFailureHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * An {@link ActionRequestFailureHandler} that simply fails the sink on any 
failures.
+ */
+public class NoOpFailureHandler implements ActionRequestFailureHandler {
+
+       private static final long serialVersionUID = 737941343410827885L;
+
+       @Override
+       public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+               // simply fail the sink
+               throw failure;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
new file mode 100644
index 0000000..fabdcbc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.util;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.ExceptionUtils;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+/**
+ * An {@link ActionRequestFailureHandler} that re-adds requests that failed 
due to temporary
+ * {@link EsRejectedExecutionException}s (which means that Elasticsearch node 
queues are currently full),
+ * and fails for all other failures.
+ */
+public class RetryRejectedExecutionFailureHandler implements 
ActionRequestFailureHandler {
+
+       private static final long serialVersionUID = -7423562912824511906L;
+
+       @Override
+       public void onFailure(ActionRequest action, Throwable failure, int 
restStatusCode, RequestIndexer indexer) throws Throwable {
+               if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
+                       indexer.add(action);
+               } else {
+                       // rethrow all other failures
+                       throw failure;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
new file mode 100644
index 0000000..b9df5c6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Suite of tests for {@link ElasticsearchSinkBase}.
+ */
+public class ElasticsearchSinkBaseTest {
+
+       /** Tests that any item failure in the listener callbacks is rethrown 
on an immediately following invoke call. */
+       @Test
+       public void testItemFailureRethrownOnInvoke() throws Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and its mock item failures
+               
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
+               testHarness.processElement(new StreamRecord<>("msg"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // manually execute the next bulk request
+               sink.manualBulkRequestWithAllPendingRequests();
+
+               try {
+                       testHarness.processElement(new StreamRecord<>("next 
msg"));
+               } catch (Exception e) {
+                       // the invoke should have failed with the failure
+                       
Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for 
record"));
+
+                       // test succeeded
+                       return;
+               }
+
+               Assert.fail();
+       }
+
+       /** Tests that any item failure in the listener callbacks is rethrown 
on an immediately following checkpoint. */
+       @Test
+       public void testItemFailureRethrownOnCheckpoint() throws Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and its mock item failures
+               
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
+               testHarness.processElement(new StreamRecord<>("msg"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // manually execute the next bulk request
+               sink.manualBulkRequestWithAllPendingRequests();
+
+               try {
+                       testHarness.snapshot(1L, 1000L);
+               } catch (Exception e) {
+                       // the snapshot should have failed with the failure
+                       
Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial 
failure for record"));
+
+                       // test succeeded
+                       return;
+               }
+
+               Assert.fail();
+       }
+
+       /**
+        * Tests that any item failure in the listener callbacks due to 
flushing on an immediately following checkpoint
+        * is rethrown; we set a timeout because the test will not finish if 
the logic is broken
+        */
+       @Test(timeout=5000)
+       public void testItemFailureRethrownOnCheckpointAfterFlush() throws 
Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and its mock item failures
+
+               List<Exception> mockResponsesList = new ArrayList<>(2);
+               mockResponsesList.add(null); // the first request in a bulk 
will succeed
+               mockResponsesList.add(new Exception("artificial failure for 
record")); // the second request in a bulk will fail
+               
sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
+
+               testHarness.processElement(new StreamRecord<>("msg-1"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // manually execute the next bulk request (1 request only, thus 
should succeed)
+               sink.manualBulkRequestWithAllPendingRequests();
+
+               // setup the requests to be flushed in the snapshot
+               testHarness.processElement(new StreamRecord<>("msg-2"));
+               testHarness.processElement(new StreamRecord<>("msg-3"));
+               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(ActionRequest.class));
+
+               CheckedThread snapshotThread = new CheckedThread() {
+                       @Override
+                       public void go() throws Exception {
+                               testHarness.snapshot(1L, 1000L);
+                       }
+               };
+               snapshotThread.start();
+
+               // the snapshot should eventually be blocked before snapshot 
triggers flushing
+               while (snapshotThread.getState() != Thread.State.WAITING) {
+                       Thread.sleep(10);
+               }
+
+               // let the snapshot-triggered flush continue (2 records in the 
bulk, so the 2nd one should fail)
+               sink.continueFlush();
+
+               try {
+                       snapshotThread.sync();
+               } catch (Exception e) {
+                       // the snapshot should have failed with the failure 
from the 2nd request
+                       
Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial 
failure for record"));
+
+                       // test succeeded
+                       return;
+               }
+
+               Assert.fail();
+       }
+
+       /** Tests that any bulk failure in the listener callbacks is rethrown 
on an immediately following invoke call. */
+       @Test
+       public void testBulkFailureRethrownOnInvoke() throws Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and let the whole bulk request 
fail
+               sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
+               testHarness.processElement(new StreamRecord<>("msg"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // manually execute the next bulk request
+               sink.manualBulkRequestWithAllPendingRequests();
+
+               try {
+                       testHarness.processElement(new StreamRecord<>("next 
msg"));
+               } catch (Exception e) {
+                       // the invoke should have failed with the bulk request 
failure
+                       
Assert.assertTrue(e.getCause().getMessage().contains("artificial failure for 
bulk request"));
+
+                       // test succeeded
+                       return;
+               }
+
+               Assert.fail();
+       }
+
+       /** Tests that any bulk failure in the listener callbacks is rethrown 
on an immediately following checkpoint. */
+       @Test
+       public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and let the whole bulk request 
fail
+               sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
+               testHarness.processElement(new StreamRecord<>("msg"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // manually execute the next bulk request
+               sink.manualBulkRequestWithAllPendingRequests();
+
+               try {
+                       testHarness.snapshot(1L, 1000L);
+               } catch (Exception e) {
+                       // the snapshot should have failed with the bulk 
request failure
+                       
Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial 
failure for bulk request"));
+
+                       // test succeeded
+                       return;
+               }
+
+               Assert.fail();
+       }
+
+       /**
+        * Tests that any bulk failure in the listener callbacks due to 
flushing on an immediately following checkpoint
+        * is rethrown; we set a timeout because the test will not finish if 
the logic is broken.
+        */
+       @Test(timeout=5000)
+       public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws 
Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new NoOpFailureHandler());
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and let bulk request succeed
+               
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception)
 null));
+               testHarness.processElement(new StreamRecord<>("msg-1"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // manually execute the next bulk request
+               sink.manualBulkRequestWithAllPendingRequests();
+
+               // setup the requests to be flushed in the snapshot
+               testHarness.processElement(new StreamRecord<>("msg-2"));
+               testHarness.processElement(new StreamRecord<>("msg-3"));
+               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(ActionRequest.class));
+
+               CheckedThread snapshotThread = new CheckedThread() {
+                       @Override
+                       public void go() throws Exception {
+                               testHarness.snapshot(1L, 1000L);
+                       }
+               };
+               snapshotThread.start();
+
+               // the snapshot should eventually be blocked before snapshot 
triggers flushing
+               while (snapshotThread.getState() != Thread.State.WAITING) {
+                       Thread.sleep(10);
+               }
+
+               // for the snapshot-triggered flush, we let the bulk request 
fail completely
+               sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
+
+               // let the snapshot-triggered flush continue (bulk request 
should fail completely)
+               sink.continueFlush();
+
+               try {
+                       snapshotThread.sync();
+               } catch (Exception e) {
+                       // the snapshot should have failed with the bulk 
request failure
+                       
Assert.assertTrue(e.getCause().getCause().getMessage().contains("artificial 
failure for bulk request"));
+
+                       // test succeeded
+                       return;
+               }
+
+               Assert.fail();
+       }
+
+       /**
+        * Tests that the sink correctly waits for pending requests (including 
re-added requests) on checkpoints;
+        * we set a timeout because the test will not finish if the logic is 
broken
+        */
+       @Test(timeout=5000)
+       public void testAtLeastOnceSink() throws Throwable {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                               new HashMap<String, String>(),
+                               new SimpleSinkFunction<String>(),
+                               new DummyRetryFailureHandler()); // use a 
failure handler that simply re-adds requests
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and its mock item failures;
+               // it contains 1 request, which will fail and re-added to the 
next bulk request
+               
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
+               testHarness.processElement(new StreamRecord<>("msg"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               CheckedThread snapshotThread = new CheckedThread() {
+                       @Override
+                       public void go() throws Exception {
+                               testHarness.snapshot(1L, 1000L);
+                       }
+               };
+               snapshotThread.start();
+
+               // the snapshot should eventually be blocked before snapshot 
triggers flushing
+               while (snapshotThread.getState() != Thread.State.WAITING) {
+                       Thread.sleep(10);
+               }
+
+               sink.continueFlush();
+
+               // since the previous flush should have resulted in a request 
re-add from the failure handler,
+               // we should have flushed again, and eventually be blocked 
before snapshot triggers the 2nd flush
+               while (snapshotThread.getState() != Thread.State.WAITING) {
+                       Thread.sleep(10);
+               }
+
+               // current number of pending request should be 1 due to the 
re-add
+               Assert.assertEquals(1, sink.getNumPendingRequests());
+
+               // this time, let the bulk request succeed, so no-more requests 
are re-added
+               
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception)
 null));
+
+               sink.continueFlush();
+
+               // the snapshot should finish with no exceptions
+               snapshotThread.sync();
+
+               testHarness.close();
+       }
+
+       /**
+        * This test is meant to assure that testAtLeastOnceSink is valid by 
testing that if flushing is disabled,
+        * the snapshot method does indeed finishes without waiting for pending 
requests;
+        * we set a timeout because the test will not finish if the logic is 
broken
+        */
+       @Test(timeout=5000)
+       public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() 
throws Exception {
+               final DummyElasticsearchSink<String> sink = new 
DummyElasticsearchSink<>(
+                       new HashMap<String, String>(), new 
SimpleSinkFunction<String>(), new DummyRetryFailureHandler());
+               sink.disableFlushOnCheckpoint(); // disable flushing
+
+               final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness.open();
+
+               // setup the next bulk request, and let bulk request succeed
+               
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
+               testHarness.processElement(new StreamRecord<>("msg-1"));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+
+               // the snapshot should not block even though we haven't flushed 
the bulk request
+               testHarness.snapshot(1L, 1000L);
+
+               testHarness.close();
+       }
+
+       private static class DummyElasticsearchSink<T> extends 
ElasticsearchSinkBase<T> {
+
+               private static final long serialVersionUID = 
5051907841570096991L;
+
+               private transient BulkProcessor mockBulkProcessor;
+               private transient BulkRequest nextBulkRequest = new 
BulkRequest();
+               private transient MultiShotLatch flushLatch = new 
MultiShotLatch();
+
+               private List<? extends Throwable> mockItemFailuresList;
+               private Throwable nextBulkFailure;
+
+               public DummyElasticsearchSink(
+                               Map<String, String> userConfig,
+                               ElasticsearchSinkFunction<T> sinkFunction,
+                               ActionRequestFailureHandler failureHandler) {
+                       super(new DummyElasticsearchApiCallBridge(), 
userConfig, sinkFunction, failureHandler);
+               }
+
+               /**
+                * This method is used to mimic a scheduled bulk request; we 
need to do this
+                * manually because we are mocking the BulkProcessor
+                */
+               public void manualBulkRequestWithAllPendingRequests() {
+                       flushLatch.trigger(); // let the flush
+                       mockBulkProcessor.flush();
+               }
+
+               /**
+                * On non-manual flushes, i.e. when flush is called in the 
snapshot method implementation,
+                * usages need to explicitly call this to allow the flush to 
continue. This is useful
+                * to make sure that specific requests get added to the the 
next bulk request for flushing.
+                */
+               public void continueFlush() {
+                       flushLatch.trigger();
+               }
+
+               /**
+                * Set the list of mock failures to use for the next bulk of 
item responses. A {@code null}
+                * means that the response is successful, failed otherwise.
+                *
+                * The list is used with corresponding order to the requests in 
the bulk, i.e. the first
+                * request uses the response at index 0, the second requests 
uses the response at index 1, etc.
+                */
+               public void 
setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> 
mockItemFailuresList) {
+                       this.mockItemFailuresList = mockItemFailuresList;
+               }
+
+               /**
+                * Let the next bulk request fail completely with the provided 
throwable.
+                * If this is set, the failures list provided with 
setMockItemFailuresListForNextBulkItemResponses is not respected.
+                */
+               public void setFailNextBulkRequestCompletely(Throwable failure) 
{
+                       this.nextBulkFailure = failure;
+               }
+
+               public BulkProcessor getMockBulkProcessor() {
+                       return mockBulkProcessor;
+               }
+
+               /**
+                * Override the bulk processor build process to provide a mock 
implementation,
+                * but reuse the listener implementation in our mock to test 
that the listener logic
+                * works correctly with request flushing logic.
+                */
+               @Override
+               protected BulkProcessor buildBulkProcessor(final 
BulkProcessor.Listener listener) {
+                       this.mockBulkProcessor = mock(BulkProcessor.class);
+
+                       
when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new 
Answer<Object>() {
+                               @Override
+                               public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                                       // intercept the request and add it to 
our mock bulk request
+                                       
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+
+                                       return null;
+                               }
+                       });
+
+                       doAnswer(new Answer() {
+                               @Override
+                               public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                                       while 
(nextBulkRequest.numberOfActions() > 0) {
+                                               // wait until we are allowed to 
continue with the flushing
+                                               flushLatch.await();
+
+                                               // create a copy of the 
accumulated mock requests, so that
+                                               // re-added requests from the 
failure handler are included in the next bulk
+                                               BulkRequest currentBulkRequest 
= nextBulkRequest;
+                                               nextBulkRequest = new 
BulkRequest();
+
+                                               listener.beforeBulk(123L, 
currentBulkRequest);
+
+                                               if (nextBulkFailure == null) {
+                                                       BulkItemResponse[] 
mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
+                                                       for (int i = 0; i < 
currentBulkRequest.requests().size(); i++) {
+                                                               Throwable 
mockItemFailure = mockItemFailuresList.get(i);
+
+                                                               if 
(mockItemFailure == null) {
+                                                                       // the 
mock response for the item is success
+                                                                       
mockResponses[i] = new BulkItemResponse(i, "opType", 
mock(ActionResponse.class));
+                                                               } else {
+                                                                       // the 
mock response for the item is failure
+                                                                       
mockResponses[i] = new BulkItemResponse(i, "opType", new 
BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
+                                                               }
+                                                       }
+
+                                                       
listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 
1000L));
+                                               } else {
+                                                       
listener.afterBulk(123L, currentBulkRequest, nextBulkFailure);
+                                               }
+                                       }
+
+                                       return null;
+                               }
+                       }).when(mockBulkProcessor).flush();
+
+                       return mockBulkProcessor;
+               }
+       }
+
+       private static class DummyElasticsearchApiCallBridge implements 
ElasticsearchApiCallBridge {
+
+               private static final long serialVersionUID = 
-4272760730959041699L;
+
+               @Override
+               public Client createClient(Map<String, String> clientConfig) {
+                       return mock(Client.class);
+               }
+
+               @Nullable
+               @Override
+               public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+                       if (bulkItemResponse.isFailed()) {
+                               return new 
Exception(bulkItemResponse.getFailure().getMessage());
+                       } else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public void configureBulkProcessorBackoff(BulkProcessor.Builder 
builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy) {
+                       // no need for this in the test cases here
+               }
+
+               @Override
+               public void cleanup() {
+                       // nothing to cleanup
+               }
+       }
+
+       private static class SimpleSinkFunction<String> implements 
ElasticsearchSinkFunction<String> {
+
+               private static final long serialVersionUID = 
-176739293659135148L;
+
+               @Override
+               public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
+                       Map<java.lang.String, Object> json = new HashMap<>();
+                       json.put("data", element);
+
+                       indexer.add(
+                               Requests.indexRequest()
+                                       .index("index")
+                                       .type("type")
+                                       .id("id")
+                                       .source(json)
+                       );
+               }
+       }
+
+       private static class DummyRetryFailureHandler implements 
ActionRequestFailureHandler {
+
+               private static final long serialVersionUID = 
5400023700099200745L;
+
+               @Override
+               public void onFailure(ActionRequest action, Throwable failure, 
int restStatusCode, RequestIndexer indexer) throws Throwable {
+                       indexer.add(action);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index 375d739..2298986 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.elasticsearch;
 
-import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.index.IndexRequest;
@@ -106,7 +106,7 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T> {
         * @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
         */
        public ElasticsearchSink(Map<String, String> userConfig, 
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
-               this(userConfig, elasticsearchSinkFunction, new 
NoOpActionRequestFailureHandler());
+               this(userConfig, elasticsearchSinkFunction, new 
NoOpFailureHandler());
        }
 
        /**
@@ -117,7 +117,7 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T> {
         * @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
         */
        public ElasticsearchSink(Map<String, String> userConfig, 
List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> 
elasticsearchSinkFunction) {
-               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpActionRequestFailureHandler());
+               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpFailureHandler());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index 2210f63..6d771d4 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.elasticsearch2;
 
 import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
-import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -89,7 +89,7 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T> {
                List<InetSocketAddress> transportAddresses,
                
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T>
 elasticsearchSinkFunction) {
 
-               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpActionRequestFailureHandler());
+               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpFailureHandler());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 175b4fa..61023c2 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.elasticsearch5;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.client.transport.TransportClient;
@@ -75,7 +75,7 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T> {
                List<InetSocketAddress> transportAddresses,
                ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
 
-               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpActionRequestFailureHandler());
+               this(userConfig, transportAddresses, elasticsearchSinkFunction, 
new NoOpFailureHandler());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 69c2692..fea25ff 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -261,6 +261,30 @@ public final class ExceptionUtils {
                }
        }
 
+       /**
+        * Checks whether a throwable chain contains a specific type of 
exception.
+        *
+        * @param throwable the throwable chain to check.
+        * @param searchType the type of exception to search for in the chain.
+        * @return True, if the searched type is nested in the throwable, false 
otherwise.
+        */
+       public static boolean containsThrowable(Throwable throwable, Class 
searchType) {
+               if (throwable == null || searchType == null) {
+                       return false;
+               }
+
+               Throwable t = throwable;
+               while (t != null) {
+                       if (searchType.isAssignableFrom(t.getClass())) {
+                               return true;
+                       } else {
+                               t = t.getCause();
+                       }
+               }
+
+               return false;
+       }
+
        // 
------------------------------------------------------------------------
 
        /** Private constructor to prevent instantiation. */

http://git-wip-us.apache.org/repos/asf/flink/blob/2437da6e/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 219bf2a..d4a031c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -325,6 +325,16 @@ public final class InstantiationUtil {
                oos.writeObject(o);
        }
 
+       public static boolean isSerializable(Object o) {
+               try {
+                       serializeObject(o);
+               } catch (IOException e) {
+                       return false;
+               }
+
+               return true;
+       }
+
        /**
         * Clones the given serializable object using Java serialization.
         *

Reply via email to