http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
new file mode 100644
index 0000000..2e06160
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class FlinkKafkaProducerBaseTest {
+
+       /**
+        * Tests that the constructor eagerly checks bootstrap servers are set 
in config
+        */
+       @Test(expected = IllegalArgumentException.class)
+       public void testInstantiationFailsWhenBootstrapServersMissing() throws 
Exception {
+               // no bootstrap servers set in props
+               Properties props = new Properties();
+               // should throw IllegalArgumentException
+               new DummyFlinkKafkaProducer<>(props, null);
+       }
+
+       /**
+        * Tests that constructor defaults to key value serializers in config 
to byte array deserializers if not set
+        */
+       @Test
+       public void testKeyValueDeserializersSetIfMissing() throws Exception {
+               Properties props = new Properties();
+               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");
+               // should set missing key value deserializers
+               new DummyFlinkKafkaProducer<>(props, null);
+
+               
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+               
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+               
assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
+               
assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
+       }
+
+       /**
+        * Tests that partitions list is determinate and correctly provided to 
custom partitioner
+        */
+       @Test
+       public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
+               KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+               RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
+               when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
+               
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
+
+               DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
+                       FakeStandardProducerConfig.get(), mockPartitioner);
+               producer.setRuntimeContext(mockRuntimeContext);
+
+               producer.open(new Configuration());
+
+               // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
+               // which should be sorted before provided to the custom 
partitioner's open() method
+               int[] correctPartitionList = {0, 1, 2, 3};
+               verify(mockPartitioner).open(0, 1, correctPartitionList);
+       }
+
+       /**
+        * Test ensuring that the producer is not dropping buffered records.;
+        * we set a timeout because the test will not finish if the logic is 
broken
+        */
+       @Test(timeout=5000)
+       public void testAtLeastOnceProducer() throws Throwable {
+               runAtLeastOnceTest(true);
+       }
+
+       /**
+        * Ensures that the at least once producing test fails if the flushing 
is disabled
+        */
+       @Test(expected = AssertionError.class, timeout=5000)
+       public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
+               runAtLeastOnceTest(false);
+       }
+
+       private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
+               final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
+               final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
+                       FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
+               producer.setFlushOnCheckpoint(flushOnCheckpoint);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness =
+                               new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
+
+               testHarness.open();
+
+               for (int i = 0; i < 100; i++) {
+                       testHarness.processElement(new StreamRecord<>("msg-" + 
i));
+               }
+
+               // start a thread confirming all pending records
+               final Tuple1<Throwable> runnableError = new Tuple1<>(null);
+               final Thread threadA = Thread.currentThread();
+
+               Runnable confirmer = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       MockProducer mp = 
producer.getProducerInstance();
+                                       List<Callback> pending = 
mp.getPending();
+
+                                       // we need to find out if the 
snapshot() method blocks forever
+                                       // this is not possible. If snapshot() 
is running, it will
+                                       // start removing elements from the 
pending list.
+                                       synchronized (threadA) {
+                                               threadA.wait(500L);
+                                       }
+                                       // we now check that no records have 
been confirmed yet
+                                       Assert.assertEquals(100, 
pending.size());
+                                       Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
+                                               snapshottingFinished.get());
+
+                                       // now confirm all checkpoints
+                                       for (Callback c: pending) {
+                                               c.onCompletion(null, null);
+                                       }
+                                       pending.clear();
+                               } catch(Throwable t) {
+                                       runnableError.f0 = t;
+                               }
+                       }
+               };
+               Thread threadB = new Thread(confirmer);
+               threadB.start();
+
+               // this should block:
+               testHarness.snapshot(0, 0);
+
+               synchronized (threadA) {
+                       threadA.notifyAll(); // just in case, to let the test 
fail faster
+               }
+               Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
+               Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+               while (deadline.hasTimeLeft() && threadB.isAlive()) {
+                       threadB.join(500);
+               }
+               Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
+               if (runnableError.f0 != null) {
+                       throw runnableError.f0;
+               }
+
+               testHarness.close();
+       }
+
+
+       // 
------------------------------------------------------------------------
+
+       private static class DummyFlinkKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
+               private static final long serialVersionUID = 1L;
+
+               private transient MockProducer prod;
+               private AtomicBoolean snapshottingFinished;
+
+               @SuppressWarnings("unchecked")
+               public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
+                       super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+                       this.snapshottingFinished = snapshottingFinished;
+               }
+
+               // constructor variant for test irrelated to snapshotting
+               @SuppressWarnings("unchecked")
+               public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner) {
+                       super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+                       this.snapshottingFinished = new AtomicBoolean(true);
+               }
+
+               @Override
+               protected <K, V> KafkaProducer<K, V> 
getKafkaProducer(Properties props) {
+                       this.prod = new MockProducer();
+                       return this.prod;
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext ctx) throws 
Exception {
+                       // call the actual snapshot state
+                       super.snapshotState(ctx);
+                       // notify test that snapshotting has been done
+                       snapshottingFinished.set(true);
+               }
+
+               @Override
+               protected void flush() {
+                       this.prod.flush();
+               }
+
+               public MockProducer getProducerInstance() {
+                       return this.prod;
+               }
+       }
+
+       private static class MockProducer<K, V> extends KafkaProducer<K, V> {
+               List<Callback> pendingCallbacks = new ArrayList<>();
+
+               public MockProducer() {
+                       super(FakeStandardProducerConfig.get());
+               }
+
+               @Override
+               public Future<RecordMetadata> send(ProducerRecord<K, V> record) 
{
+                       throw new UnsupportedOperationException("Unexpected");
+               }
+
+               @Override
+               public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
+                       pendingCallbacks.add(callback);
+                       return null;
+               }
+
+               @Override
+               public List<PartitionInfo> partitionsFor(String topic) {
+                       List<PartitionInfo> list = new ArrayList<>();
+                       // deliberately return an out-of-order partition list
+                       list.add(new PartitionInfo(topic, 3, null, null, null));
+                       list.add(new PartitionInfo(topic, 1, null, null, null));
+                       list.add(new PartitionInfo(topic, 0, null, null, null));
+                       list.add(new PartitionInfo(topic, 2, null, null, null));
+                       return list;
+               }
+
+               @Override
+               public Map<MetricName, ? extends Metric> metrics() {
+                       return null;
+               }
+
+
+               public List<Callback> getPending() {
+                       return this.pendingCallbacks;
+               }
+
+               public void flush() {
+                       while (pendingCallbacks.size() > 0) {
+                               try {
+                                       Thread.sleep(10);
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException("Unable to 
flush producer, task was interrupted");
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
new file mode 100644
index 0000000..1882a7e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class JSONDeserializationSchemaTest {
+       @Test
+       public void testDeserialize() throws IOException {
+               ObjectMapper mapper = new ObjectMapper();
+               ObjectNode initialValue = mapper.createObjectNode();
+               initialValue.put("key", 4).put("value", "world");
+               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+
+               JSONDeserializationSchema schema = new 
JSONDeserializationSchema();
+               ObjectNode deserializedValue = 
schema.deserialize(serializedValue);
+
+               Assert.assertEquals(4, deserializedValue.get("key").asInt());
+               Assert.assertEquals("world", 
deserializedValue.get("value").asText());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
new file mode 100644
index 0000000..86d3105
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class JSONKeyValueDeserializationSchemaTest {
+       @Test
+       public void testDeserializeWithoutMetadata() throws IOException {
+               ObjectMapper mapper = new ObjectMapper();
+               ObjectNode initialKey = mapper.createObjectNode();
+               initialKey.put("index", 4);
+               byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+
+               ObjectNode initialValue = mapper.createObjectNode();
+               initialValue.put("word", "world");
+               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+
+               JSONKeyValueDeserializationSchema schema = new 
JSONKeyValueDeserializationSchema(false);
+               ObjectNode deserializedValue = 
schema.deserialize(serializedKey, serializedValue, "", 0, 0);
+
+
+               Assert.assertTrue(deserializedValue.get("metadata") == null);
+               Assert.assertEquals(4, 
deserializedValue.get("key").get("index").asInt());
+               Assert.assertEquals("world", 
deserializedValue.get("value").get("word").asText());
+       }
+
+       @Test
+       public void testDeserializeWithMetadata() throws IOException {
+               ObjectMapper mapper = new ObjectMapper();
+               ObjectNode initialKey = mapper.createObjectNode();
+               initialKey.put("index", 4);
+               byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+
+               ObjectNode initialValue = mapper.createObjectNode();
+               initialValue.put("word", "world");
+               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+
+               JSONKeyValueDeserializationSchema schema = new 
JSONKeyValueDeserializationSchema(true);
+               ObjectNode deserializedValue = 
schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4);
+
+               Assert.assertEquals(4, 
deserializedValue.get("key").get("index").asInt());
+               Assert.assertEquals("world", 
deserializedValue.get("value").get("word").asText());
+               Assert.assertEquals("topic#1", 
deserializedValue.get("metadata").get("topic").asText());
+               Assert.assertEquals(4, 
deserializedValue.get("metadata").get("offset").asInt());
+               Assert.assertEquals(3, 
deserializedValue.get("metadata").get("partition").asInt());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
new file mode 100644
index 0000000..68225e2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JsonRowDeserializationSchemaTest {
+
+       /**
+        * Tests simple deserialization.
+        */
+       @Test
+       public void testDeserialization() throws Exception {
+               long id = 1238123899121L;
+               String name = "asdlkjasjkdla998y1122";
+               byte[] bytes = new byte[1024];
+               ThreadLocalRandom.current().nextBytes(bytes);
+
+               ObjectMapper objectMapper = new ObjectMapper();
+
+               // Root
+               ObjectNode root = objectMapper.createObjectNode();
+               root.put("id", id);
+               root.put("name", name);
+               root.put("bytes", bytes);
+
+               byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
+                               new String[] { "id", "name", "bytes" },
+                               new Class<?>[] { Long.class, String.class, 
byte[].class });
+
+               Row deserialized = 
deserializationSchema.deserialize(serializedJson);
+
+               assertEquals(3, deserialized.productArity());
+               assertEquals(id, deserialized.productElement(0));
+               assertEquals(name, deserialized.productElement(1));
+               assertArrayEquals(bytes, (byte[]) 
deserialized.productElement(2));
+       }
+
+       /**
+        * Tests deserialization with non-existing field name.
+        */
+       @Test
+       public void testMissingNode() throws Exception {
+               ObjectMapper objectMapper = new ObjectMapper();
+
+               // Root
+               ObjectNode root = objectMapper.createObjectNode();
+               root.put("id", 123123123);
+               byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
+                               new String[] { "name" },
+                               new Class<?>[] { String.class });
+
+               Row row = deserializationSchema.deserialize(serializedJson);
+
+               assertEquals(1, row.productArity());
+               assertNull("Missing field not null", row.productElement(0));
+
+               deserializationSchema.setFailOnMissingField(true);
+
+               try {
+                       deserializationSchema.deserialize(serializedJson);
+                       fail("Did not throw expected Exception");
+               } catch (IOException e) {
+                       assertTrue(e.getCause() instanceof 
IllegalStateException);
+               }
+       }
+
+       /**
+        * Tests that number of field names and types has to match.
+        */
+       @Test
+       public void testNumberOfFieldNamesAndTypesMismatch() throws Exception {
+               try {
+                       new JsonRowDeserializationSchema(
+                                       new String[] { "one", "two", "three" },
+                                       new Class<?>[] { Long.class });
+                       fail("Did not throw expected Exception");
+               } catch (IllegalArgumentException ignored) {
+                       // Expected
+               }
+
+               try {
+                       new JsonRowDeserializationSchema(
+                                       new String[] { "one" },
+                                       new Class<?>[] { Long.class, 
String.class });
+                       fail("Did not throw expected Exception");
+               } catch (IllegalArgumentException ignored) {
+                       // Expected
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
new file mode 100644
index 0000000..92af15d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class JsonRowSerializationSchemaTest {
+       @Test
+       public void testRowSerialization() throws IOException {
+               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
+               Row row = new Row(3);
+               row.setField(0, 1);
+               row.setField(1, true);
+               row.setField(2, "str");
+
+               Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, 
row);
+               assertEqualRows(row, resultRow);
+       }
+
+       @Test
+       public void testSerializationOfTwoRows() throws IOException {
+               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
+               Row row1 = new Row(3);
+               row1.setField(0, 1);
+               row1.setField(1, true);
+               row1.setField(2, "str");
+
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
+
+               byte[] bytes = serializationSchema.serialize(row1);
+               assertEqualRows(row1, deserializationSchema.deserialize(bytes));
+
+               Row row2 = new Row(3);
+               row2.setField(0, 10);
+               row2.setField(1, false);
+               row2.setField(2, "newStr");
+
+               bytes = serializationSchema.serialize(row2);
+               assertEqualRows(row2, deserializationSchema.deserialize(bytes));
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void testInputValidation() {
+               new JsonRowSerializationSchema(null);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testSerializeRowWithInvalidNumberOfFields() {
+               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               Row row = new Row(1);
+               row.setField(0, 1);
+
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               serializationSchema.serialize(row);
+       }
+
+       private Row serializeAndDeserialize(String[] fieldNames, Class[] 
fieldTypes, Row row) throws IOException {
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
+
+               byte[] bytes = serializationSchema.serialize(row);
+               return deserializationSchema.deserialize(bytes);
+       }
+
+       private void assertEqualRows(Row expectedRow, Row resultRow) {
+               assertEquals("Deserialized row should have expected number of 
fields",
+                       expectedRow.productArity(), resultRow.productArity());
+               for (int i = 0; i < expectedRow.productArity(); i++) {
+                       assertEquals(String.format("Field number %d should be 
as in the original row", i),
+                               expectedRow.productElement(i), 
resultRow.productElement(i));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..9beed22
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+       @Test
+       public void testPartitionsEqualConsumers() {
+               try {
+                       List<KafkaTopicPartition> inPartitions = Arrays.asList(
+                                       new KafkaTopicPartition("test-topic", 
4),
+                                       new KafkaTopicPartition("test-topic", 
52),
+                                       new KafkaTopicPartition("test-topic", 
17),
+                                       new KafkaTopicPartition("test-topic", 
1));
+
+                       for (int i = 0; i < inPartitions.size(); i++) {
+                               List<KafkaTopicPartition> parts = 
+                                               
FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
+
+                               assertNotNull(parts);
+                               assertEquals(1, parts.size());
+                               assertTrue(contains(inPartitions, 
parts.get(0).getPartition()));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private boolean contains(List<KafkaTopicPartition> inPartitions, int 
partition) {
+               for (KafkaTopicPartition ktp : inPartitions) {
+                       if (ktp.getPartition() == partition) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       @Test
+       public void testMultiplePartitionsPerConsumers() {
+               try {
+                       final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 
31, 127, 14};
+
+                       final List<KafkaTopicPartition> partitions = new 
ArrayList<>();
+                       final Set<KafkaTopicPartition> allPartitions = new 
HashSet<>();
+
+                       for (int p : partitionIDs) {
+                               KafkaTopicPartition part = new 
KafkaTopicPartition("test-topic", p);
+                               partitions.add(part);
+                               allPartitions.add(part);
+                       }
+
+                       final int numConsumers = 3;
+                       final int minPartitionsPerConsumer = partitions.size() 
/ numConsumers;
+                       final int maxPartitionsPerConsumer = partitions.size() 
/ numConsumers + 1;
+
+                       for (int i = 0; i < numConsumers; i++) {
+                               List<KafkaTopicPartition> parts = 
+                                               
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+
+                               assertNotNull(parts);
+                               assertTrue(parts.size() >= 
minPartitionsPerConsumer);
+                               assertTrue(parts.size() <= 
maxPartitionsPerConsumer);
+
+                               for (KafkaTopicPartition p : parts) {
+                                       // check that the element was actually 
contained
+                                       assertTrue(allPartitions.remove(p));
+                               }
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allPartitions.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPartitionsFewerThanConsumers() {
+               try {
+                       List<KafkaTopicPartition> inPartitions = Arrays.asList(
+                                       new KafkaTopicPartition("test-topic", 
4),
+                                       new KafkaTopicPartition("test-topic", 
52),
+                                       new KafkaTopicPartition("test-topic", 
17),
+                                       new KafkaTopicPartition("test-topic", 
1));
+
+                       final Set<KafkaTopicPartition> allPartitions = new 
HashSet<>();
+                       allPartitions.addAll(inPartitions);
+
+                       final int numConsumers = 2 * inPartitions.size() + 3;
+
+                       for (int i = 0; i < numConsumers; i++) {
+                               List<KafkaTopicPartition> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+
+                               assertNotNull(parts);
+                               assertTrue(parts.size() <= 1);
+
+                               for (KafkaTopicPartition p : parts) {
+                                       // check that the element was actually 
contained
+                                       assertTrue(allPartitions.remove(p));
+                               }
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allPartitions.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testAssignEmptyPartitions() {
+               try {
+                       List<KafkaTopicPartition> ep = new ArrayList<>();
+                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+                       assertNotNull(parts1);
+                       assertTrue(parts1.isEmpty());
+
+                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+                       assertNotNull(parts2);
+                       assertTrue(parts2.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testGrowingPartitionsRemainsStable() {
+               try {
+                       final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 
42, 31, 127, 14};
+                       List<KafkaTopicPartition> newPartitions = new 
ArrayList<>();
+
+                       for (int p : newPartitionIDs) {
+                               KafkaTopicPartition part = new 
KafkaTopicPartition("test-topic", p);
+                               newPartitions.add(part);
+                       }
+
+                       List<KafkaTopicPartition> initialPartitions = 
newPartitions.subList(0, 7);
+
+                       final Set<KafkaTopicPartition> allNewPartitions = new 
HashSet<>(newPartitions);
+                       final Set<KafkaTopicPartition> allInitialPartitions = 
new HashSet<>(initialPartitions);
+
+                       final int numConsumers = 3;
+                       final int minInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers;
+                       final int maxInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers + 1;
+                       final int minNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers;
+                       final int maxNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers + 1;
+
+                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       initialPartitions, numConsumers, 0);
+                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       initialPartitions, numConsumers, 1);
+                       List<KafkaTopicPartition> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       initialPartitions, numConsumers, 2);
+
+                       assertNotNull(parts1);
+                       assertNotNull(parts2);
+                       assertNotNull(parts3);
+
+                       assertTrue(parts1.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(parts1.size() <= 
maxInitialPartitionsPerConsumer);
+                       assertTrue(parts2.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(parts2.size() <= 
maxInitialPartitionsPerConsumer);
+                       assertTrue(parts3.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(parts3.size() <= 
maxInitialPartitionsPerConsumer);
+
+                       for (KafkaTopicPartition p : parts1) {
+                               // check that the element was actually contained
+                               assertTrue(allInitialPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartition p : parts2) {
+                               // check that the element was actually contained
+                               assertTrue(allInitialPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartition p : parts3) {
+                               // check that the element was actually contained
+                               assertTrue(allInitialPartitions.remove(p));
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allInitialPartitions.isEmpty());
+
+                       // grow the set of partitions and distribute anew
+
+                       List<KafkaTopicPartition> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       newPartitions, numConsumers, 0);
+                       List<KafkaTopicPartition> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       newPartitions, numConsumers, 1);
+                       List<KafkaTopicPartition> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       newPartitions, numConsumers, 2);
+
+                       // new partitions must include all old partitions
+
+                       assertTrue(parts1new.size() > parts1.size());
+                       assertTrue(parts2new.size() > parts2.size());
+                       assertTrue(parts3new.size() > parts3.size());
+
+                       assertTrue(parts1new.containsAll(parts1));
+                       assertTrue(parts2new.containsAll(parts2));
+                       assertTrue(parts3new.containsAll(parts3));
+
+                       assertTrue(parts1new.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(parts1new.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(parts2new.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(parts2new.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(parts3new.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(parts3new.size() <= 
maxNewPartitionsPerConsumer);
+
+                       for (KafkaTopicPartition p : parts1new) {
+                               // check that the element was actually contained
+                               assertTrue(allNewPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartition p : parts2new) {
+                               // check that the element was actually contained
+                               assertTrue(allNewPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartition p : parts3new) {
+                               // check that the element was actually contained
+                               assertTrue(allNewPartitions.remove(p));
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allNewPartitions.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+}

Reply via email to