Repository: flink
Updated Branches:
  refs/heads/master ffaf10d22 -> 7206b0ed2


[FLINK-4027] Flush FlinkKafkaProducer on checkpoints

This closes #2108

This closes #2058 because its an invalid pull request.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7206b0ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7206b0ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7206b0ed

Branch: refs/heads/master
Commit: 7206b0ed2adb10c94e1ffd3dbe851250b44edcf4
Parents: ffaf10d
Author: Robert Metzger <rmetz...@apache.org>
Authored: Wed Jun 15 17:50:38 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Jul 4 11:55:09 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer08.java  |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |   4 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |   7 +
 .../kafka/KafkaTestEnvironmentImpl.java         |   4 +-
 .../kafka/FlinkKafkaProducerBase.java           | 120 ++++++++--
 .../kafka/AtLeastOnceProducerTest.java          | 218 +++++++++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java |   3 +-
 7 files changed, 358 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 4975f9a..e509d2f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -125,4 +125,22 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
+       @Override
+       protected void flush() {
+               // The Kafka 0.8 producer doesn't support flushing, we wait here
+               // until all pending records are confirmed
+               //noinspection SynchronizeOnNonFinalField
+               synchronized (pendingRecordsLock) {
+                       while (pendingRecords > 0) {
+                               try {
+                                       pendingRecordsLock.wait();
+                               } catch (InterruptedException e) {
+                                       // this can be interrupted when the 
Task has been cancelled.
+                                       // by throwing an exception, we ensure 
that this checkpoint doesn't get confirmed
+                                       throw new RuntimeException("Flushing 
got interrupted while checkpointing", e);
+                               }
+                       }
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 77d41ac..75ca9ed 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -97,7 +97,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
        @Override
        public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
-               return new FlinkKafkaProducer08<T>(topic, serSchema, props, 
partitioner);
+               FlinkKafkaProducer08<T> prod = new 
FlinkKafkaProducer08<T>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return prod;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 6f7f687..eb3440a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -127,4 +127,11 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
        public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
+
+       @Override
+       protected void flush() {
+               if (this.producer != null) {
+                       producer.flush();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index b5df6e0..0dbe865 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -92,7 +92,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
        @Override
        public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
-               return new FlinkKafkaProducer09<>(topic, serSchema, props, 
partitioner);
+               FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return prod;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 6005dff..a9d4917 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -20,7 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -28,6 +30,7 @@ import org.apache.flink.util.NetUtils;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -39,6 +42,7 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -50,11 +54,14 @@ import static java.util.Objects.requireNonNull;
 /**
  * Flink Sink to produce data into a Kafka topic.
  *
- * Please note that this producer does not have any reliability guarantees.
+ * Please note that this producer provides at-least-once reliability 
guarantees when
+ * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
+ * Otherwise, the producer doesn't provide any reliability guarantees.
  *
  * @param <IN> Type of the messages to write into Kafka.
  */
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>  
{
+@SuppressWarnings("SynchronizeOnNonFinalField")
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> 
implements Checkpointed<Serializable> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
 
@@ -101,6 +108,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
         * Flag indicating whether to accept failures (and log them), or to 
fail on failures
         */
        protected boolean logFailuresOnly;
+
+       /**
+        * If true, the producer will wait until all outstanding records have 
been send to the broker.
+        */
+       private boolean flushOnCheckpoint = false;
        
        // -------------------------------- Runtime fields 
------------------------------------------
 
@@ -113,6 +125,16 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
        /** Errors encountered in the async producer are stored here */
        protected transient volatile Exception asyncException;
 
+       /**
+        * Number of unacknowledged records.
+        */
+       protected long pendingRecords = 0;
+
+       /**
+        * Lock for accessing the pending records
+        */
+       protected transient Object pendingRecordsLock;
+
 
        /**
         * The main constructor for creating a FlinkKafkaProducer.
@@ -150,7 +172,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
 
                // create a local KafkaProducer to get the list of partitions.
                // this will also ensure locally that all required 
ProducerConfig values are set.
-               try (KafkaProducer<Void, IN> getPartitionsProd = new 
KafkaProducer<>(this.producerConfig)) {
+               try (Producer<Void, IN> getPartitionsProd = 
getKafkaProducer(this.producerConfig)) {
                        List<PartitionInfo> partitionsList = 
getPartitionsProd.partitionsFor(defaultTopicId);
 
                        this.partitions = new int[partitionsList.size()];
@@ -178,6 +200,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
                this.logFailuresOnly = logFailuresOnly;
        }
 
+       /**
+        * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka buffers
+        * to be acknowledged by the Kafka producer on a checkpoint.
+        * This way, the producer can guarantee that messages in the Kafka 
buffers are part of the checkpoint.
+        *
+        * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
+        */
+       public void setFlushOnCheckpoint(boolean flush) {
+               this.flushOnCheckpoint = flush;
+       }
+
+       /**
+        * Used for testing only
+        */
+       protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
+               return new KafkaProducer<>(props);
+       }
+
        // ----------------------------------- Utilities 
--------------------------
        
        /**
@@ -185,10 +225,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
         */
        @Override
        public void open(Configuration configuration) {
-               producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+               producer = getKafkaProducer(this.producerConfig);
 
                RuntimeContext ctx = getRuntimeContext();
-               if(partitioner != null) {
+               if (partitioner != null) {
                        partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
                }
 
@@ -196,32 +236,40 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
                                ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 
                // register Kafka metrics to Flink accumulators
-               
if(!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+               if 
(!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
                        Map<MetricName, ? extends Metric> metrics = 
this.producer.metrics();
 
-                       if(metrics == null) {
+                       if (metrics == null) {
                                // MapR's Kafka implementation returns null 
here.
                                LOG.info("Producer implementation does not 
support metrics");
                        } else {
-                               for(Map.Entry<MetricName, ? extends Metric> 
metric: metrics.entrySet()) {
+                               for (Map.Entry<MetricName, ? extends Metric> 
metric: metrics.entrySet()) {
                                        String name = producerId + "-producer-" 
+ metric.getKey().name();
                                        DefaultKafkaMetricAccumulator 
kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
                                        // best effort: we only add the 
accumulator if available.
-                                       if(kafkaAccumulator != null) {
+                                       if (kafkaAccumulator != null) {
                                                
getRuntimeContext().addAccumulator(name, kafkaAccumulator);
                                        }
                                }
                        }
                }
 
+               if (flushOnCheckpoint && 
!((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
+                       LOG.warn("Flushing on checkpoint is enabled, but 
checkpointing is not enabled. Disabling flushing.");
+                       flushOnCheckpoint = false;
+               }
+               if (flushOnCheckpoint) {
+                       pendingRecordsLock = new Object();
+               }
+
                if (logFailuresOnly) {
                        callback = new Callback() {
-                               
                                @Override
                                public void onCompletion(RecordMetadata 
metadata, Exception e) {
                                        if (e != null) {
                                                LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
                                        }
+                                       acknowledgeMessage();
                                }
                        };
                }
@@ -232,6 +280,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
                                        if (exception != null && asyncException 
== null) {
                                                asyncException = exception;
                                        }
+                                       acknowledgeMessage();
                                }
                        };
                }
@@ -251,17 +300,21 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
                byte[] serializedKey = schema.serializeKey(next);
                byte[] serializedValue = schema.serializeValue(next);
                String targetTopic = schema.getTargetTopic(next);
-               if(targetTopic == null) {
+               if (targetTopic == null) {
                        targetTopic = defaultTopicId;
                }
 
                ProducerRecord<byte[], byte[]> record;
-               if(partitioner == null) {
+               if (partitioner == null) {
                        record = new ProducerRecord<>(targetTopic, 
serializedKey, serializedValue);
                } else {
                        record = new ProducerRecord<>(targetTopic, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
                }
-
+               if (flushOnCheckpoint) {
+                       synchronized (pendingRecordsLock) {
+                               pendingRecords++;
+                       }
+               }
                producer.send(record, callback);
        }
 
@@ -276,6 +329,47 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN>  {
                checkErroneous();
        }
 
+       // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
+
+       private void acknowledgeMessage() {
+               if (!flushOnCheckpoint) {
+                       // the logic is disabled
+                       return;
+               }
+               synchronized (pendingRecordsLock) {
+                       pendingRecords--;
+                       if (pendingRecords == 0) {
+                               pendingRecordsLock.notifyAll();
+                       }
+               }
+       }
+
+       /**
+        * Flush pending records.
+        */
+       protected abstract void flush();
+
+       @Override
+       public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+               if (flushOnCheckpoint) {
+                       // flushing is activated: We need to wait until 
pendingRecords is 0
+                       flush();
+                       synchronized (pendingRecordsLock) {
+                               if (pendingRecords != 0) {
+                                       throw new 
IllegalStateException("Pending record count must be zero at this point: " + 
pendingRecords);
+                               }
+                               // pending records count is 0. We can now 
confirm the checkpoint
+                       }
+               }
+               // return empty state
+               return null;
+       }
+
+       @Override
+       public void restoreState(Serializable state) {
+               // nothing to do here
+       }
+
 
        // ----------------------------------- Utilities 
--------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
new file mode 100644
index 0000000..b02593c
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test ensuring that the producer is not dropping buffered records
+ */
+@SuppressWarnings("unchecked")
+public class AtLeastOnceProducerTest {
+
+       // we set a timeout because the test will not finish if the logic is 
broken
+       @Test(timeout=5000)
+       public void testAtLeastOnceProducer() throws Throwable {
+               runTest(true);
+       }
+
+       // This test ensures that the actual test fails if the flushing is 
disabled
+       @Test(expected = AssertionError.class, timeout=5000)
+       public void ensureTestFails() throws Throwable {
+               runTest(false);
+       }
+
+       private void runTest(boolean flushOnCheckpoint) throws Throwable {
+               Properties props = new Properties();
+               final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
+               final TestingKafkaProducer<String> producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props,
+                               snapshottingFinished);
+               producer.setFlushOnCheckpoint(flushOnCheckpoint);
+               producer.setRuntimeContext(new MockRuntimeContext(0, 1));
+
+               producer.open(new Configuration());
+
+               for (int i = 0; i < 100; i++) {
+                       producer.invoke("msg-" + i);
+               }
+               // start a thread confirming all pending records
+               final Tuple1<Throwable> runnableError = new Tuple1<>(null);
+               final Thread threadA = Thread.currentThread();
+
+               Runnable confirmer = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       MockProducer mp = 
producer.getProducerInstance();
+                                       List<Callback> pending = 
mp.getPending();
+
+                                       // we need to find out if the 
snapshot() method blocks forever
+                                       // this is not possible. If snapshot() 
is running, it will
+                                       // start removing elements from the 
pending list.
+                                       synchronized (threadA) {
+                                               threadA.wait(500L);
+                                       }
+                                       // we now check that no records have 
been confirmed yet
+                                       Assert.assertEquals(100, 
pending.size());
+                                       Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
+                                                       
snapshottingFinished.get());
+
+                                       // now confirm all checkpoints
+                                       for (Callback c: pending) {
+                                               c.onCompletion(null, null);
+                                       }
+                                       pending.clear();
+                               } catch(Throwable t) {
+                                       runnableError.f0 = t;
+                               }
+                       }
+               };
+               Thread threadB = new Thread(confirmer);
+               threadB.start();
+               // this should block:
+               producer.snapshotState(0, 0);
+               synchronized (threadA) {
+                       threadA.notifyAll(); // just in case, to let the test 
fail faster
+               }
+               Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
+               Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+               while (deadline.hasTimeLeft() && threadB.isAlive()) {
+                       threadB.join(500);
+               }
+               Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
+               if (runnableError.f0 != null) {
+                       throw runnableError.f0;
+               }
+
+               producer.close();
+       }
+
+
+       private static class TestingKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
+               private MockProducer prod;
+               private AtomicBoolean snapshottingFinished;
+
+               public TestingKafkaProducer(String defaultTopicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
AtomicBoolean snapshottingFinished) {
+                       super(defaultTopicId, serializationSchema, 
producerConfig, null);
+                       this.snapshottingFinished = snapshottingFinished;
+               }
+
+               @Override
+               protected <K, V> KafkaProducer<K, V> 
getKafkaProducer(Properties props) {
+                       this.prod = new MockProducer();
+                       return this.prod;
+               }
+
+               @Override
+               public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       // call the actual snapshot state
+                       Serializable ret = super.snapshotState(checkpointId, 
checkpointTimestamp);
+                       // notify test that snapshotting has been done
+                       snapshottingFinished.set(true);
+                       return ret;
+               }
+
+               @Override
+               protected void flush() {
+                       this.prod.flush();
+               }
+
+               public MockProducer getProducerInstance() {
+                       return this.prod;
+               }
+       }
+
+       private static class MockProducer<K, V> extends KafkaProducer<K, V> {
+               List<Callback> pendingCallbacks = new ArrayList<>();
+
+               private static Properties getFakeProperties() {
+                       Properties p = new Properties();
+                       p.setProperty("bootstrap.servers", "localhost:12345");
+                       p.setProperty("key.serializer", 
ByteArraySerializer.class.getName());
+                       p.setProperty("value.serializer", 
ByteArraySerializer.class.getName());
+                       return p;
+               }
+               public MockProducer() {
+                       super(getFakeProperties());
+               }
+
+               @Override
+               public Future<RecordMetadata> send(ProducerRecord<K, V> record) 
{
+                       throw new UnsupportedOperationException("Unexpected");
+               }
+
+               @Override
+               public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
+                       pendingCallbacks.add(callback);
+                       return null;
+               }
+
+               @Override
+               public List<PartitionInfo> partitionsFor(String topic) {
+                       List<PartitionInfo> list = new ArrayList<>();
+                       list.add(new PartitionInfo(topic, 0, null, null, null));
+                       return list;
+               }
+
+               @Override
+               public Map<MetricName, ? extends Metric> metrics() {
+                       return null;
+               }
+
+
+               public List<Callback> getPending() {
+                       return this.pendingCallbacks;
+               }
+
+               public void flush() {
+                       while (pendingCallbacks.size() > 0) {
+                               try {
+                                       Thread.sleep(10);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Unable to 
flush producer, task was interrupted");
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 220f061..d49b48b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -268,7 +268,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                });
                Properties producerProperties = 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
                producerProperties.setProperty("retries", "3");
-               stream.addSink(kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null));
+               FlinkKafkaProducerBase<Tuple2<Long, String>> prod = 
kafkaServer.getProducer(topic, new 
KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
+               stream.addSink(prod);
 
                // ----------- add consumer dataflow ----------
 

Reply via email to