Repository: flink
Updated Branches:
  refs/heads/master a42eafb40 -> 1f9c0cf85


[FLINK-4155] [kafka] Move partition list fetching  to open() for Kafka producers

The fetched partition list from Kafka in open() is sorted by partition id
so that subtasks will have the same list across failures. To compensate the 
original
use of the KafkaProducer instantiation in the constructor to eagerly
ensure that required producer configs are provided, we check that at least
the bootstrap servers are set.

This change also includes refactoring of AtLeastOnceProducerTest for a more
complete suite of tests on FlinkKafkaProducerBase.

This closes #2681.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f9c0cf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f9c0cf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f9c0cf8

Branch: refs/heads/master
Commit: 1f9c0cf8522481c1a1007d98d90b30baff5c18ca
Parents: a42eafb
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri Oct 21 14:23:58 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Nov 22 23:50:25 2016 +0800

----------------------------------------------------------------------
 .../kafka/Kafka08JsonTableSinkTest.java         |   4 +-
 .../connectors/kafka/KafkaProducerTest.java     |  11 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   4 +-
 .../connectors/kafka/KafkaProducerTest.java     |   9 +-
 .../kafka/FlinkKafkaProducerBase.java           |  42 +--
 .../kafka/AtLeastOnceProducerTest.java          | 231 ---------------
 .../kafka/FlinkKafkaProducerBaseTest.java       | 288 +++++++++++++++++++
 .../kafka/KafkaTableSinkTestBase.java           |  16 +-
 .../testutils/FakeStandardProducerConfig.java   |  34 +++
 9 files changed, 374 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 446e1d7..6d0b140 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -41,8 +41,8 @@ public class Kafka08JsonTableSinkTest extends 
KafkaTableSinkTestBase {
 
        @Override
        @SuppressWarnings("unchecked")
-       protected Class<SerializationSchema<Row>> getSerializationSchema() {
-               return (Class) JsonRowSerializationSchema.class;
+       protected SerializationSchema<Row> getSerializationSchema() {
+               return new JsonRowSerializationSchema(FIELD_NAMES);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 7efa94e..91fc286 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -37,8 +38,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
-import java.util.Properties;
+import java.util.Collections;
 import java.util.concurrent.Future;
 
 
@@ -60,7 +60,8 @@ public class KafkaProducerTest extends TestLogger {
                        
                        // partition setup
                        
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                                       Arrays.asList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+                               // returning a unmodifiable list to mimic 
KafkaProducer#partitionsFor() behaviour
+                               Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
 
                        // failure when trying to send an element
                        when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
@@ -79,7 +80,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (1) producer that propagates errors
 
                        FlinkKafkaProducer08<String> producerPropagating = new 
FlinkKafkaProducer08<>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
 
                        OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
                                        new 
OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
@@ -102,7 +103,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (2) producer that only logs errors
 
                        FlinkKafkaProducer08<String> producerLogging = new 
FlinkKafkaProducer08<>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
                        producerLogging.setLogFailuresOnly(true);
 
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 068640d..45f70ac 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -41,8 +41,8 @@ public class Kafka09JsonTableSinkTest extends 
KafkaTableSinkTestBase {
 
        @Override
        @SuppressWarnings("unchecked")
-       protected Class<SerializationSchema<Row>> getSerializationSchema() {
-               return (Class) JsonRowSerializationSchema.class;
+       protected SerializationSchema<Row> getSerializationSchema() {
+               return new JsonRowSerializationSchema(FIELD_NAMES);
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 31691d5..18b2aec 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -40,7 +41,6 @@ import 
org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Collections;
-import java.util.Properties;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertNotNull;
@@ -65,7 +65,8 @@ public class KafkaProducerTest extends TestLogger {
                        
                        // partition setup
                        
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                                       Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+                               // returning a unmodifiable list to mimic 
KafkaProducer#partitionsFor() behaviour
+                               Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
 
                        // failure when trying to send an element
                        when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
@@ -84,7 +85,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (1) producer that propagates errors
 
                        FlinkKafkaProducer09<String> producerPropagating = new 
FlinkKafkaProducer09<>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
 
                        OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
                                        new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -105,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (2) producer that only logs errors
 
                        FlinkKafkaProducer09<String> producerLogging = new 
FlinkKafkaProducer09<>(
-                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
                        producerLogging.setLogFailuresOnly(true);
 
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index bede064..33289f8 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -46,8 +45,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Collections;
+import java.util.Comparator;
 
 import static java.util.Objects.requireNonNull;
 
@@ -76,7 +78,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
         * Array with the partition ids of the given defaultTopicId
         * The size of this array is the number of partitions
         */
-       protected final int[] partitions;
+       protected int[] partitions;
 
        /**
         * User defined properties for the Producer
@@ -148,30 +150,22 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                this.schema = serializationSchema;
                this.producerConfig = producerConfig;
 
-               // set the producer configuration properties.
-               if 
(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+               // set the producer configuration properties for kafka record 
key value serializers.
+               if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
                        
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
                } else {
                        LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                }
 
-               if 
(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+               if 
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
                        
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
                } else {
                        LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                }
 
-
-               // create a local KafkaProducer to get the list of partitions.
-               // this will also ensure locally that all required 
ProducerConfig values are set.
-               try (Producer<Void, IN> getPartitionsProd = 
getKafkaProducer(this.producerConfig)) {
-                       List<PartitionInfo> partitionsList = 
getPartitionsProd.partitionsFor(defaultTopicId);
-
-                       this.partitions = new int[partitionsList.size()];
-                       for (int i = 0; i < partitions.length; i++) {
-                               partitions[i] = 
partitionsList.get(i).partition();
-                       }
-                       getPartitionsProd.close();
+               // eagerly ensure that bootstrap servers are set.
+               if 
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+                       throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
                }
 
                this.partitioner = customPartitioner;
@@ -218,6 +212,22 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        public void open(Configuration configuration) {
                producer = getKafkaProducer(this.producerConfig);
 
+               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
+               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
+               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
+                       @Override
+                       public int compare(PartitionInfo o1, PartitionInfo o2) {
+                               return Integer.compare(o1.partition(), 
o2.partition());
+                       }
+               });
+
+               partitions = new int[partitionsList.size()];
+               for (int i = 0; i < partitions.length; i++) {
+                       partitions[i] = partitionsList.get(i).partition();
+               }
+
                RuntimeContext ctx = getRuntimeContext();
                if (partitioner != null) {
                        partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
deleted file mode 100644
index 6d92f9b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Assert;
-import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Test ensuring that the producer is not dropping buffered records
- */
-@SuppressWarnings("unchecked")
-public class AtLeastOnceProducerTest {
-
-       // we set a timeout because the test will not finish if the logic is 
broken
-       @Test(timeout=5000)
-       public void testAtLeastOnceProducer() throws Throwable {
-               runTest(true);
-       }
-
-       // This test ensures that the actual test fails if the flushing is 
disabled
-       @Test(expected = AssertionError.class, timeout=5000)
-       public void ensureTestFails() throws Throwable {
-               runTest(false);
-       }
-
-       private void runTest(boolean flushOnCheckpoint) throws Throwable {
-               Properties props = new Properties();
-               final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
-
-               final TestingKafkaProducer<String> producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props,
-                               snapshottingFinished);
-
-               producer.setFlushOnCheckpoint(flushOnCheckpoint);
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                               new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
-
-               testHarness.open();
-
-               for (int i = 0; i < 100; i++) {
-                       testHarness.processElement(new StreamRecord<>("msg-" + 
i));
-               }
-
-               // start a thread confirming all pending records
-               final Tuple1<Throwable> runnableError = new Tuple1<>(null);
-               final Thread threadA = Thread.currentThread();
-
-               Runnable confirmer = new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       MockProducer mp = 
producer.getProducerInstance();
-                                       List<Callback> pending = 
mp.getPending();
-
-                                       // we need to find out if the 
snapshot() method blocks forever
-                                       // this is not possible. If snapshot() 
is running, it will
-                                       // start removing elements from the 
pending list.
-                                       synchronized (threadA) {
-                                               threadA.wait(500L);
-                                       }
-                                       // we now check that no records have 
been confirmed yet
-                                       Assert.assertEquals(100, 
pending.size());
-                                       Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
-                                                       
snapshottingFinished.get());
-
-                                       // now confirm all checkpoints
-                                       for (Callback c: pending) {
-                                               c.onCompletion(null, null);
-                                       }
-                                       pending.clear();
-                               } catch(Throwable t) {
-                                       runnableError.f0 = t;
-                               }
-                       }
-               };
-               Thread threadB = new Thread(confirmer);
-               threadB.start();
-
-               // this should block:
-               testHarness.snapshot(0, 0);
-
-               synchronized (threadA) {
-                       threadA.notifyAll(); // just in case, to let the test 
fail faster
-               }
-               Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
-               Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
-               while (deadline.hasTimeLeft() && threadB.isAlive()) {
-                       threadB.join(500);
-               }
-               Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
-               if (runnableError.f0 != null) {
-                       throw runnableError.f0;
-               }
-
-               testHarness.close();
-       }
-
-
-       private static class TestingKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
-               private static final long serialVersionUID = 1L;
-
-               private transient MockProducer prod;
-               private AtomicBoolean snapshottingFinished;
-
-               public TestingKafkaProducer(String defaultTopicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
AtomicBoolean snapshottingFinished) {
-                       super(defaultTopicId, serializationSchema, 
producerConfig, null);
-                       this.snapshottingFinished = snapshottingFinished;
-               }
-
-               @Override
-               protected <K, V> KafkaProducer<K, V> 
getKafkaProducer(Properties props) {
-                       this.prod = new MockProducer();
-                       return this.prod;
-               }
-
-               @Override
-               public void snapshotState(FunctionSnapshotContext ctx) throws 
Exception {
-                       // call the actual snapshot state
-                       super.snapshotState(ctx);
-                       // notify test that snapshotting has been done
-                       snapshottingFinished.set(true);
-               }
-
-               @Override
-               protected void flush() {
-                       this.prod.flush();
-               }
-
-               public MockProducer getProducerInstance() {
-                       return this.prod;
-               }
-       }
-
-       private static class MockProducer<K, V> extends KafkaProducer<K, V> {
-               List<Callback> pendingCallbacks = new ArrayList<>();
-
-               private static Properties getFakeProperties() {
-                       Properties p = new Properties();
-                       p.setProperty("bootstrap.servers", "localhost:12345");
-                       p.setProperty("key.serializer", 
ByteArraySerializer.class.getName());
-                       p.setProperty("value.serializer", 
ByteArraySerializer.class.getName());
-                       return p;
-               }
-               public MockProducer() {
-                       super(getFakeProperties());
-               }
-
-               @Override
-               public Future<RecordMetadata> send(ProducerRecord<K, V> record) 
{
-                       throw new UnsupportedOperationException("Unexpected");
-               }
-
-               @Override
-               public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
-                       pendingCallbacks.add(callback);
-                       return null;
-               }
-
-               @Override
-               public List<PartitionInfo> partitionsFor(String topic) {
-                       List<PartitionInfo> list = new ArrayList<>();
-                       list.add(new PartitionInfo(topic, 0, null, null, null));
-                       return list;
-               }
-
-               @Override
-               public Map<MetricName, ? extends Metric> metrics() {
-                       return null;
-               }
-
-
-               public List<Callback> getPending() {
-                       return this.pendingCallbacks;
-               }
-
-               public void flush() {
-                       while (pendingCallbacks.size() > 0) {
-                               try {
-                                       Thread.sleep(10);
-                               } catch (InterruptedException e) {
-                                       throw new RuntimeException("Unable to 
flush producer, task was interrupted");
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
new file mode 100644
index 0000000..2e06160
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class FlinkKafkaProducerBaseTest {
+
+       /**
+        * Tests that the constructor eagerly checks bootstrap servers are set 
in config
+        */
+       @Test(expected = IllegalArgumentException.class)
+       public void testInstantiationFailsWhenBootstrapServersMissing() throws 
Exception {
+               // no bootstrap servers set in props
+               Properties props = new Properties();
+               // should throw IllegalArgumentException
+               new DummyFlinkKafkaProducer<>(props, null);
+       }
+
+       /**
+        * Tests that constructor defaults to key value serializers in config 
to byte array deserializers if not set
+        */
+       @Test
+       public void testKeyValueDeserializersSetIfMissing() throws Exception {
+               Properties props = new Properties();
+               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");
+               // should set missing key value deserializers
+               new DummyFlinkKafkaProducer<>(props, null);
+
+               
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+               
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+               
assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
+               
assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
+       }
+
+       /**
+        * Tests that partitions list is determinate and correctly provided to 
custom partitioner
+        */
+       @Test
+       public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
+               KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+               RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
+               when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
+               
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
+
+               DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
+                       FakeStandardProducerConfig.get(), mockPartitioner);
+               producer.setRuntimeContext(mockRuntimeContext);
+
+               producer.open(new Configuration());
+
+               // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
+               // which should be sorted before provided to the custom 
partitioner's open() method
+               int[] correctPartitionList = {0, 1, 2, 3};
+               verify(mockPartitioner).open(0, 1, correctPartitionList);
+       }
+
+       /**
+        * Test ensuring that the producer is not dropping buffered records.;
+        * we set a timeout because the test will not finish if the logic is 
broken
+        */
+       @Test(timeout=5000)
+       public void testAtLeastOnceProducer() throws Throwable {
+               runAtLeastOnceTest(true);
+       }
+
+       /**
+        * Ensures that the at least once producing test fails if the flushing 
is disabled
+        */
+       @Test(expected = AssertionError.class, timeout=5000)
+       public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
+               runAtLeastOnceTest(false);
+       }
+
+       private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
+               final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
+               final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
+                       FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
+               producer.setFlushOnCheckpoint(flushOnCheckpoint);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness =
+                               new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
+
+               testHarness.open();
+
+               for (int i = 0; i < 100; i++) {
+                       testHarness.processElement(new StreamRecord<>("msg-" + 
i));
+               }
+
+               // start a thread confirming all pending records
+               final Tuple1<Throwable> runnableError = new Tuple1<>(null);
+               final Thread threadA = Thread.currentThread();
+
+               Runnable confirmer = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       MockProducer mp = 
producer.getProducerInstance();
+                                       List<Callback> pending = 
mp.getPending();
+
+                                       // we need to find out if the 
snapshot() method blocks forever
+                                       // this is not possible. If snapshot() 
is running, it will
+                                       // start removing elements from the 
pending list.
+                                       synchronized (threadA) {
+                                               threadA.wait(500L);
+                                       }
+                                       // we now check that no records have 
been confirmed yet
+                                       Assert.assertEquals(100, 
pending.size());
+                                       Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
+                                               snapshottingFinished.get());
+
+                                       // now confirm all checkpoints
+                                       for (Callback c: pending) {
+                                               c.onCompletion(null, null);
+                                       }
+                                       pending.clear();
+                               } catch(Throwable t) {
+                                       runnableError.f0 = t;
+                               }
+                       }
+               };
+               Thread threadB = new Thread(confirmer);
+               threadB.start();
+
+               // this should block:
+               testHarness.snapshot(0, 0);
+
+               synchronized (threadA) {
+                       threadA.notifyAll(); // just in case, to let the test 
fail faster
+               }
+               Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
+               Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+               while (deadline.hasTimeLeft() && threadB.isAlive()) {
+                       threadB.join(500);
+               }
+               Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
+               if (runnableError.f0 != null) {
+                       throw runnableError.f0;
+               }
+
+               testHarness.close();
+       }
+
+
+       // 
------------------------------------------------------------------------
+
+       private static class DummyFlinkKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
+               private static final long serialVersionUID = 1L;
+
+               private transient MockProducer prod;
+               private AtomicBoolean snapshottingFinished;
+
+               @SuppressWarnings("unchecked")
+               public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
+                       super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+                       this.snapshottingFinished = snapshottingFinished;
+               }
+
+               // constructor variant for test irrelated to snapshotting
+               @SuppressWarnings("unchecked")
+               public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner) {
+                       super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+                       this.snapshottingFinished = new AtomicBoolean(true);
+               }
+
+               @Override
+               protected <K, V> KafkaProducer<K, V> 
getKafkaProducer(Properties props) {
+                       this.prod = new MockProducer();
+                       return this.prod;
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext ctx) throws 
Exception {
+                       // call the actual snapshot state
+                       super.snapshotState(ctx);
+                       // notify test that snapshotting has been done
+                       snapshottingFinished.set(true);
+               }
+
+               @Override
+               protected void flush() {
+                       this.prod.flush();
+               }
+
+               public MockProducer getProducerInstance() {
+                       return this.prod;
+               }
+       }
+
+       private static class MockProducer<K, V> extends KafkaProducer<K, V> {
+               List<Callback> pendingCallbacks = new ArrayList<>();
+
+               public MockProducer() {
+                       super(FakeStandardProducerConfig.get());
+               }
+
+               @Override
+               public Future<RecordMetadata> send(ProducerRecord<K, V> record) 
{
+                       throw new UnsupportedOperationException("Unexpected");
+               }
+
+               @Override
+               public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
+                       pendingCallbacks.add(callback);
+                       return null;
+               }
+
+               @Override
+               public List<PartitionInfo> partitionsFor(String topic) {
+                       List<PartitionInfo> list = new ArrayList<>();
+                       // deliberately return an out-of-order partition list
+                       list.add(new PartitionInfo(topic, 3, null, null, null));
+                       list.add(new PartitionInfo(topic, 1, null, null, null));
+                       list.add(new PartitionInfo(topic, 0, null, null, null));
+                       list.add(new PartitionInfo(topic, 2, null, null, null));
+                       return list;
+               }
+
+               @Override
+               public Map<MetricName, ? extends Metric> metrics() {
+                       return null;
+               }
+
+
+               public List<Callback> getPending() {
+                       return this.pendingCallbacks;
+               }
+
+               public void flush() {
+                       while (pendingCallbacks.size() > 0) {
+                               try {
+                                       Thread.sleep(10);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Unable to 
flush producer, task was interrupted");
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index baddab1..ae0af52 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.junit.Test;
 
@@ -41,18 +42,23 @@ import static org.mockito.Mockito.verify;
 public abstract class KafkaTableSinkTestBase {
 
        private static final String TOPIC = "testTopic";
-       private static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
+       protected static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
        private static final TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
        private static final KafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
        private static final Properties PROPERTIES = createSinkProperties();
-       // we have to mock FlinkKafkaProducerBase as it cannot be instantiated 
without Kafka
        @SuppressWarnings("unchecked")
-       private static final FlinkKafkaProducerBase<Row> PRODUCER = 
mock(FlinkKafkaProducerBase.class);
+       private final FlinkKafkaProducerBase<Row> PRODUCER = new 
FlinkKafkaProducerBase<Row>(
+               TOPIC, new 
KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, 
PARTITIONER) {
+
+               @Override
+               protected void flush() {}
+       };
 
        @Test
        @SuppressWarnings("unchecked")
        public void testKafkaTableSink() throws Exception {
                DataStream dataStream = mock(DataStream.class);
+
                KafkaTableSink kafkaTableSink = spy(createTableSink());
                kafkaTableSink.emitDataStream(dataStream);
 
@@ -61,7 +67,7 @@ public abstract class KafkaTableSinkTestBase {
                verify(kafkaTableSink).createKafkaProducer(
                        eq(TOPIC),
                        eq(PROPERTIES),
-                       any(getSerializationSchema()),
+                       any(getSerializationSchema().getClass()),
                        eq(PARTITIONER));
        }
 
@@ -79,7 +85,7 @@ public abstract class KafkaTableSinkTestBase {
        protected abstract KafkaTableSink createTableSink(String topic, 
Properties properties,
                        KafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
 
-       protected abstract Class<SerializationSchema<Row>> 
getSerializationSchema();
+       protected abstract SerializationSchema<Row> getSerializationSchema();
 
        private KafkaTableSink createTableSink() {
                return createTableSink(TOPIC, PROPERTIES, PARTITIONER, 
PRODUCER);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9c0cf8/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
new file mode 100644
index 0000000..055326d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Properties;
+
+public class FakeStandardProducerConfig {
+
+       public static Properties get() {
+               Properties p = new Properties();
+               p.setProperty("bootstrap.servers", "localhost:12345");
+               p.setProperty("key.serializer", 
ByteArraySerializer.class.getName());
+               p.setProperty("value.serializer", 
ByteArraySerializer.class.getName());
+               return p;
+       }
+
+}

Reply via email to