http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
deleted file mode 100644
index 51bc8d1..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-
-/**
- * A serialization and deserialization schema for Key Value Pairs that uses 
Flink's serialization stack to
- * transform typed from and to byte arrays.
- * 
- * @param <K> The key type to be serialized.
- * @param <V> The value type to be serialized.
- */
-public class TypeInformationKeyValueSerializationSchema<K, V> implements 
KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> 
{
-
-       private static final long serialVersionUID = -5359448468131559102L;
-
-       /** The serializer for the key */
-       private final TypeSerializer<K> keySerializer;
-
-       /** The serializer for the value */
-       private final TypeSerializer<V> valueSerializer;
-
-       /** reusable input deserialization buffer */
-       private final DataInputDeserializer inputDeserializer;
-       
-       /** reusable output serialization buffer for the key */
-       private transient DataOutputSerializer keyOutputSerializer;
-
-       /** reusable output serialization buffer for the value */
-       private transient DataOutputSerializer valueOutputSerializer;
-       
-       
-       /** The type information, to be returned by {@link #getProducedType()}. 
It is
-        * transient, because it is not serializable. Note that this means that 
the type information
-        * is not available at runtime, but only prior to the first 
serialization / deserialization */
-       private final transient TypeInformation<Tuple2<K, V>> typeInfo;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new de-/serialization schema for the given types.
-        *
-        * @param keyTypeInfo The type information for the key type 
de-/serialized by this schema.
-        * @param valueTypeInfo The type information for the value type 
de-/serialized by this schema.
-        * @param ec The execution config, which is used to parametrize the 
type serializers.
-        */
-       public TypeInformationKeyValueSerializationSchema(TypeInformation<K> 
keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
-               this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
-               this.keySerializer = keyTypeInfo.createSerializer(ec);
-               this.valueSerializer = valueTypeInfo.createSerializer(ec);
-               this.inputDeserializer = new DataInputDeserializer();
-       }
-
-       /**
-        * Creates a new de-/serialization schema for the given types. This 
constructor accepts the types
-        * as classes and internally constructs the type information from the 
classes.
-        * 
-        * <p>If the types are parametrized and cannot be fully defined via 
classes, use the constructor
-        * that accepts {@link TypeInformation} instead.
-        * 
-        * @param keyClass The class of the key de-/serialized by this schema.
-        * @param valueClass The class of the value de-/serialized by this 
schema.
-        * @param config The execution config, which is used to parametrize the 
type serializers.
-        */
-       public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, 
Class<V> valueClass, ExecutionConfig config) {
-               this(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass), config);
-       }
-
-       // 
------------------------------------------------------------------------
-
-
-       @Override
-       public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
-               K key = null;
-               V value = null;
-               
-               if (messageKey != null) {
-                       inputDeserializer.setBuffer(messageKey, 0, 
messageKey.length);
-                       key = keySerializer.deserialize(inputDeserializer);
-               }
-               if (message != null) {
-                       inputDeserializer.setBuffer(message, 0, message.length);
-                       value = valueSerializer.deserialize(inputDeserializer);
-               }
-               return new Tuple2<>(key, value);
-       }
-
-       /**
-        * This schema never considers an element to signal end-of-stream, so 
this method returns always false.
-        * @param nextElement The element to test for the end-of-stream signal.
-        * @return Returns false.
-        */
-       @Override
-       public boolean isEndOfStream(Tuple2<K,V> nextElement) {
-               return false;
-       }
-
-
-       @Override
-       public byte[] serializeKey(Tuple2<K, V> element) {
-               if (element.f0 == null) {
-                       return null;
-               } else {
-                       // key is not null. serialize it:
-                       if (keyOutputSerializer == null) {
-                               keyOutputSerializer = new 
DataOutputSerializer(16);
-                       }
-                       try {
-                               keySerializer.serialize(element.f0, 
keyOutputSerializer);
-                       }
-                       catch (IOException e) {
-                               throw new RuntimeException("Unable to serialize 
record", e);
-                       }
-                       // check if key byte array size changed
-                       byte[] res = keyOutputSerializer.getByteArray();
-                       if (res.length != keyOutputSerializer.length()) {
-                               byte[] n = new 
byte[keyOutputSerializer.length()];
-                               System.arraycopy(res, 0, n, 0, 
keyOutputSerializer.length());
-                               res = n;
-                       }
-                       keyOutputSerializer.clear();
-                       return res;
-               }
-       }
-
-       @Override
-       public byte[] serializeValue(Tuple2<K, V> element) {
-               // if the value is null, its serialized value is null as well.
-               if (element.f1 == null) {
-                       return null;
-               }
-
-               if (valueOutputSerializer == null) {
-                       valueOutputSerializer = new DataOutputSerializer(16);
-               }
-
-               try {
-                       valueSerializer.serialize(element.f1, 
valueOutputSerializer);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Unable to serialize 
record", e);
-               }
-
-               byte[] res = valueOutputSerializer.getByteArray();
-               if (res.length != valueOutputSerializer.length()) {
-                       byte[] n = new byte[valueOutputSerializer.length()];
-                       System.arraycopy(res, 0, n, 0, 
valueOutputSerializer.length());
-                       res = n;
-               }
-               valueOutputSerializer.clear();
-               return res;
-       }
-
-       @Override
-       public String getTargetTopic(Tuple2<K, V> element) {
-               return null; // we are never overriding the topic
-       }
-
-
-       @Override
-       public TypeInformation<Tuple2<K,V>> getProducedType() {
-               if (typeInfo != null) {
-                       return typeInfo;
-               }
-               else {
-                       throw new IllegalStateException(
-                                       "The type information is not available 
after this class has been serialized and distributed.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
deleted file mode 100644
index b96ba30..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class FlinkKafkaConsumerBaseTest {
-
-       /**
-        * Tests that not both types of timestamp extractors / watermark 
generators can be used.
-        */
-       @Test
-       public void testEitherWatermarkExtractor() {
-               try {
-                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>)
 null);
-                       fail();
-               } catch (NullPointerException ignored) {}
-
-               try {
-                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>)
 null);
-                       fail();
-               } catch (NullPointerException ignored) {}
-               
-               @SuppressWarnings("unchecked")
-               final AssignerWithPeriodicWatermarks<String> periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
-               @SuppressWarnings("unchecked")
-               final AssignerWithPunctuatedWatermarks<String> 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
-               
-               DummyFlinkKafkaConsumer<String> c1 = new 
DummyFlinkKafkaConsumer<>();
-               c1.assignTimestampsAndWatermarks(periodicAssigner);
-               try {
-                       c1.assignTimestampsAndWatermarks(punctuatedAssigner);
-                       fail();
-               } catch (IllegalStateException ignored) {}
-
-               DummyFlinkKafkaConsumer<String> c2 = new 
DummyFlinkKafkaConsumer<>();
-               c2.assignTimestampsAndWatermarks(punctuatedAssigner);
-               try {
-                       c2.assignTimestampsAndWatermarks(periodicAssigner);
-                       fail();
-               } catch (IllegalStateException ignored) {}
-       }
-
-       /**
-        * Tests that no checkpoints happen when the fetcher is not running.
-        */
-       @Test
-       public void ignoreCheckpointWhenNotRunning() throws Exception {
-               @SuppressWarnings("unchecked")
-               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
-
-               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
new LinkedMap(), false);
-               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
-               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
-               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(1, 1));
-
-               assertFalse(listState.get().iterator().hasNext());
-               consumer.notifyCheckpointComplete(66L);
-       }
-
-       /**
-        * Tests that no checkpoints happen when the fetcher is not running.
-        */
-       @Test
-       public void checkRestoredCheckpointWhenFetcherNotReady() throws 
Exception {
-               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
-
-               TestingListState<Serializable> listState = new 
TestingListState<>();
-               listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 
16768L));
-               listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 
987654321L));
-
-               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
-
-               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
-
-               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-               when(initializationContext.isRestored()).thenReturn(true);
-
-               consumer.initializeState(initializationContext);
-
-               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(17, 17));
-
-               // ensure that the list was cleared and refilled. while this is 
an implementation detail, we use it here
-               // to figure out that snapshotState() actually did something.
-               Assert.assertTrue(listState.isClearCalled());
-
-               Set<Serializable> expected = new HashSet<>();
-
-               for (Serializable serializable : listState.get()) {
-                       expected.add(serializable);
-               }
-
-               int counter = 0;
-
-               for (Serializable serializable : listState.get()) {
-                       assertTrue(expected.contains(serializable));
-                       counter++;
-               }
-
-               assertEquals(expected.size(), counter);
-       }
-
-       /**
-        * Tests that no checkpoints happen when the fetcher is not running.
-        */
-       @Test
-       public void checkRestoredNullCheckpointWhenFetcherNotReady() throws 
Exception {
-               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
-
-               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
-               TestingListState<Serializable> listState = new 
TestingListState<>();
-               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
-
-               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-               when(initializationContext.isRestored()).thenReturn(false);
-
-               consumer.initializeState(initializationContext);
-
-               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(17, 17));
-
-               assertFalse(listState.get().iterator().hasNext());
-       }
-
-       /**
-        * Tests that on snapshots, states and offsets to commit to Kafka are 
correct
-        */
-       @Test
-       public void checkUseFetcherWhenNoCheckpoint() throws Exception {
-
-               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
-               List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
-               partitionList.add(new KafkaTopicPartition("test", 0));
-               consumer.setSubscribedPartitions(partitionList);
-
-               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
-               TestingListState<Serializable> listState = new 
TestingListState<>();
-               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
-
-               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-
-               // make the context signal that there is no restored state, 
then validate that
-               when(initializationContext.isRestored()).thenReturn(false);
-               consumer.initializeState(initializationContext);
-               consumer.run(mock(SourceFunction.SourceContext.class));
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testSnapshotState() throws Exception {
-
-               // 
--------------------------------------------------------------------
-               //   prepare fake states
-               // 
--------------------------------------------------------------------
-
-               final HashMap<KafkaTopicPartition, Long> state1 = new 
HashMap<>();
-               state1.put(new KafkaTopicPartition("abc", 13), 16768L);
-               state1.put(new KafkaTopicPartition("def", 7), 987654321L);
-
-               final HashMap<KafkaTopicPartition, Long> state2 = new 
HashMap<>();
-               state2.put(new KafkaTopicPartition("abc", 13), 16770L);
-               state2.put(new KafkaTopicPartition("def", 7), 987654329L);
-
-               final HashMap<KafkaTopicPartition, Long> state3 = new 
HashMap<>();
-               state3.put(new KafkaTopicPartition("abc", 13), 16780L);
-               state3.put(new KafkaTopicPartition("def", 7), 987654377L);
-
-               // 
--------------------------------------------------------------------
-               
-               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
-               when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, 
state3);
-                       
-               final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-       
-               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
pendingOffsetsToCommit, true);
-               assertEquals(0, pendingOffsetsToCommit.size());
-
-               OperatorStateStore backend = mock(OperatorStateStore.class);
-
-               TestingListState<Serializable> listState = new 
TestingListState<>();
-
-               
when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
-
-               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
-
-               
when(initializationContext.getOperatorStateStore()).thenReturn(backend);
-               when(initializationContext.isRestored()).thenReturn(false, 
true, true, true);
-
-               consumer.initializeState(initializationContext);
-
-               // checkpoint 1
-               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(138, 138));
-
-               HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
-
-               for (Serializable serializable : listState.get()) {
-                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
-                       snapshot1.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
-               }
-
-               assertEquals(state1, snapshot1);
-               assertEquals(1, pendingOffsetsToCommit.size());
-               assertEquals(state1, pendingOffsetsToCommit.get(138L));
-
-               // checkpoint 2
-               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(140, 140));
-
-               HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
-
-               for (Serializable serializable : listState.get()) {
-                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
-                       snapshot2.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
-               }
-
-               assertEquals(state2, snapshot2);
-               assertEquals(2, pendingOffsetsToCommit.size());
-               assertEquals(state2, pendingOffsetsToCommit.get(140L));
-               
-               // ack checkpoint 1
-               consumer.notifyCheckpointComplete(138L);
-               assertEquals(1, pendingOffsetsToCommit.size());
-               assertTrue(pendingOffsetsToCommit.containsKey(140L));
-
-               // checkpoint 3
-               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(141, 141));
-
-               HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
-
-               for (Serializable serializable : listState.get()) {
-                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
-                       snapshot3.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
-               }
-
-               assertEquals(state3, snapshot3);
-               assertEquals(2, pendingOffsetsToCommit.size());
-               assertEquals(state3, pendingOffsetsToCommit.get(141L));
-               
-               // ack checkpoint 3, subsumes number 2
-               consumer.notifyCheckpointComplete(141L);
-               assertEquals(0, pendingOffsetsToCommit.size());
-
-
-               consumer.notifyCheckpointComplete(666); // invalid checkpoint
-               assertEquals(0, pendingOffsetsToCommit.size());
-
-               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
-               listState = new TestingListState<>();
-               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
-               // create 500 snapshots
-               for (int i = 100; i < 600; i++) {
-                       consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(i, i));
-                       listState.clear();
-               }
-               
assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, 
pendingOffsetsToCommit.size());
-
-               // commit only the second last
-               consumer.notifyCheckpointComplete(598);
-               assertEquals(1, pendingOffsetsToCommit.size());
-
-               // access invalid checkpoint
-               consumer.notifyCheckpointComplete(590);
-
-               // and the last
-               consumer.notifyCheckpointComplete(599);
-               assertEquals(0, pendingOffsetsToCommit.size());
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static <T> FlinkKafkaConsumerBase<T> getConsumer(
-                       AbstractFetcher<T, ?> fetcher, LinkedMap 
pendingOffsetsToCommit, boolean running) throws Exception
-       {
-               FlinkKafkaConsumerBase<T> consumer = new 
DummyFlinkKafkaConsumer<>();
-
-               Field fetcherField = 
FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
-               fetcherField.setAccessible(true);
-               fetcherField.set(consumer, fetcher);
-
-               Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
-               mapField.setAccessible(true);
-               mapField.set(consumer, pendingOffsetsToCommit);
-
-               Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
-               runningField.setAccessible(true);
-               runningField.set(consumer, running);
-
-               return consumer;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
-               private static final long serialVersionUID = 1L;
-
-               @SuppressWarnings("unchecked")
-               public DummyFlinkKafkaConsumer() {
-                       super(Arrays.asList("dummy-topic"), 
(KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
-               }
-
-               @Override
-               protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> 
sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, 
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
StreamingRuntimeContext runtimeContext) throws Exception {
-                       AbstractFetcher<T, ?> fetcher = 
mock(AbstractFetcher.class);
-                       doAnswer(new Answer() {
-                               @Override
-                               public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
-                                       Assert.fail("Trying to restore offsets 
even though there was no restore state.");
-                                       return null;
-                               }
-                       }).when(fetcher).restoreOffsets(any(HashMap.class));
-                       return fetcher;
-               }
-
-               @Override
-               protected List<KafkaTopicPartition> 
getKafkaPartitions(List<String> topics) {
-                       return Collections.emptyList();
-               }
-
-               @Override
-               public RuntimeContext getRuntimeContext() {
-                       return mock(StreamingRuntimeContext.class);
-               }
-       }
-
-       private static final class TestingListState<T> implements ListState<T> {
-
-               private final List<T> list = new ArrayList<>();
-               private boolean clearCalled = false;
-
-               @Override
-               public void clear() {
-                       list.clear();
-                       clearCalled = true;
-               }
-
-               @Override
-               public Iterable<T> get() throws Exception {
-                       return list;
-               }
-
-               @Override
-               public void add(T value) throws Exception {
-                       list.add(value);
-               }
-
-               public List<T> getList() {
-                       return list;
-               }
-
-               public boolean isClearCalled() {
-                       return clearCalled;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
deleted file mode 100644
index 2e06160..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.Assert;
-import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class FlinkKafkaProducerBaseTest {
-
-       /**
-        * Tests that the constructor eagerly checks bootstrap servers are set 
in config
-        */
-       @Test(expected = IllegalArgumentException.class)
-       public void testInstantiationFailsWhenBootstrapServersMissing() throws 
Exception {
-               // no bootstrap servers set in props
-               Properties props = new Properties();
-               // should throw IllegalArgumentException
-               new DummyFlinkKafkaProducer<>(props, null);
-       }
-
-       /**
-        * Tests that constructor defaults to key value serializers in config 
to byte array deserializers if not set
-        */
-       @Test
-       public void testKeyValueDeserializersSetIfMissing() throws Exception {
-               Properties props = new Properties();
-               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");
-               // should set missing key value deserializers
-               new DummyFlinkKafkaProducer<>(props, null);
-
-               
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-               
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-               
assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
-               
assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName()));
-       }
-
-       /**
-        * Tests that partitions list is determinate and correctly provided to 
custom partitioner
-        */
-       @Test
-       public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
-               KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
-               RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
-               when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
-               
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
-               DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
-                       FakeStandardProducerConfig.get(), mockPartitioner);
-               producer.setRuntimeContext(mockRuntimeContext);
-
-               producer.open(new Configuration());
-
-               // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
-               // which should be sorted before provided to the custom 
partitioner's open() method
-               int[] correctPartitionList = {0, 1, 2, 3};
-               verify(mockPartitioner).open(0, 1, correctPartitionList);
-       }
-
-       /**
-        * Test ensuring that the producer is not dropping buffered records.;
-        * we set a timeout because the test will not finish if the logic is 
broken
-        */
-       @Test(timeout=5000)
-       public void testAtLeastOnceProducer() throws Throwable {
-               runAtLeastOnceTest(true);
-       }
-
-       /**
-        * Ensures that the at least once producing test fails if the flushing 
is disabled
-        */
-       @Test(expected = AssertionError.class, timeout=5000)
-       public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
-               runAtLeastOnceTest(false);
-       }
-
-       private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
-               final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
-               final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
-                       FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
-               producer.setFlushOnCheckpoint(flushOnCheckpoint);
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness =
-                               new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
-
-               testHarness.open();
-
-               for (int i = 0; i < 100; i++) {
-                       testHarness.processElement(new StreamRecord<>("msg-" + 
i));
-               }
-
-               // start a thread confirming all pending records
-               final Tuple1<Throwable> runnableError = new Tuple1<>(null);
-               final Thread threadA = Thread.currentThread();
-
-               Runnable confirmer = new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       MockProducer mp = 
producer.getProducerInstance();
-                                       List<Callback> pending = 
mp.getPending();
-
-                                       // we need to find out if the 
snapshot() method blocks forever
-                                       // this is not possible. If snapshot() 
is running, it will
-                                       // start removing elements from the 
pending list.
-                                       synchronized (threadA) {
-                                               threadA.wait(500L);
-                                       }
-                                       // we now check that no records have 
been confirmed yet
-                                       Assert.assertEquals(100, 
pending.size());
-                                       Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
-                                               snapshottingFinished.get());
-
-                                       // now confirm all checkpoints
-                                       for (Callback c: pending) {
-                                               c.onCompletion(null, null);
-                                       }
-                                       pending.clear();
-                               } catch(Throwable t) {
-                                       runnableError.f0 = t;
-                               }
-                       }
-               };
-               Thread threadB = new Thread(confirmer);
-               threadB.start();
-
-               // this should block:
-               testHarness.snapshot(0, 0);
-
-               synchronized (threadA) {
-                       threadA.notifyAll(); // just in case, to let the test 
fail faster
-               }
-               Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
-               Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
-               while (deadline.hasTimeLeft() && threadB.isAlive()) {
-                       threadB.join(500);
-               }
-               Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
-               if (runnableError.f0 != null) {
-                       throw runnableError.f0;
-               }
-
-               testHarness.close();
-       }
-
-
-       // 
------------------------------------------------------------------------
-
-       private static class DummyFlinkKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
-               private static final long serialVersionUID = 1L;
-
-               private transient MockProducer prod;
-               private AtomicBoolean snapshottingFinished;
-
-               @SuppressWarnings("unchecked")
-               public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
-                       super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
-                       this.snapshottingFinished = snapshottingFinished;
-               }
-
-               // constructor variant for test irrelated to snapshotting
-               @SuppressWarnings("unchecked")
-               public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner) {
-                       super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
-                       this.snapshottingFinished = new AtomicBoolean(true);
-               }
-
-               @Override
-               protected <K, V> KafkaProducer<K, V> 
getKafkaProducer(Properties props) {
-                       this.prod = new MockProducer();
-                       return this.prod;
-               }
-
-               @Override
-               public void snapshotState(FunctionSnapshotContext ctx) throws 
Exception {
-                       // call the actual snapshot state
-                       super.snapshotState(ctx);
-                       // notify test that snapshotting has been done
-                       snapshottingFinished.set(true);
-               }
-
-               @Override
-               protected void flush() {
-                       this.prod.flush();
-               }
-
-               public MockProducer getProducerInstance() {
-                       return this.prod;
-               }
-       }
-
-       private static class MockProducer<K, V> extends KafkaProducer<K, V> {
-               List<Callback> pendingCallbacks = new ArrayList<>();
-
-               public MockProducer() {
-                       super(FakeStandardProducerConfig.get());
-               }
-
-               @Override
-               public Future<RecordMetadata> send(ProducerRecord<K, V> record) 
{
-                       throw new UnsupportedOperationException("Unexpected");
-               }
-
-               @Override
-               public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
Callback callback) {
-                       pendingCallbacks.add(callback);
-                       return null;
-               }
-
-               @Override
-               public List<PartitionInfo> partitionsFor(String topic) {
-                       List<PartitionInfo> list = new ArrayList<>();
-                       // deliberately return an out-of-order partition list
-                       list.add(new PartitionInfo(topic, 3, null, null, null));
-                       list.add(new PartitionInfo(topic, 1, null, null, null));
-                       list.add(new PartitionInfo(topic, 0, null, null, null));
-                       list.add(new PartitionInfo(topic, 2, null, null, null));
-                       return list;
-               }
-
-               @Override
-               public Map<MetricName, ? extends Metric> metrics() {
-                       return null;
-               }
-
-
-               public List<Callback> getPending() {
-                       return this.pendingCallbacks;
-               }
-
-               public void flush() {
-                       while (pendingCallbacks.size() > 0) {
-                               try {
-                                       Thread.sleep(10);
-                               } catch (InterruptedException e) {
-                                       throw new RuntimeException("Unable to 
flush producer, task was interrupted");
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
deleted file mode 100644
index 1882a7e..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class JSONDeserializationSchemaTest {
-       @Test
-       public void testDeserialize() throws IOException {
-               ObjectMapper mapper = new ObjectMapper();
-               ObjectNode initialValue = mapper.createObjectNode();
-               initialValue.put("key", 4).put("value", "world");
-               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-               JSONDeserializationSchema schema = new 
JSONDeserializationSchema();
-               ObjectNode deserializedValue = 
schema.deserialize(serializedValue);
-
-               Assert.assertEquals(4, deserializedValue.get("key").asInt());
-               Assert.assertEquals("world", 
deserializedValue.get("value").asText());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
deleted file mode 100644
index 86d3105..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import 
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class JSONKeyValueDeserializationSchemaTest {
-       @Test
-       public void testDeserializeWithoutMetadata() throws IOException {
-               ObjectMapper mapper = new ObjectMapper();
-               ObjectNode initialKey = mapper.createObjectNode();
-               initialKey.put("index", 4);
-               byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
-
-               ObjectNode initialValue = mapper.createObjectNode();
-               initialValue.put("word", "world");
-               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-               JSONKeyValueDeserializationSchema schema = new 
JSONKeyValueDeserializationSchema(false);
-               ObjectNode deserializedValue = 
schema.deserialize(serializedKey, serializedValue, "", 0, 0);
-
-
-               Assert.assertTrue(deserializedValue.get("metadata") == null);
-               Assert.assertEquals(4, 
deserializedValue.get("key").get("index").asInt());
-               Assert.assertEquals("world", 
deserializedValue.get("value").get("word").asText());
-       }
-
-       @Test
-       public void testDeserializeWithMetadata() throws IOException {
-               ObjectMapper mapper = new ObjectMapper();
-               ObjectNode initialKey = mapper.createObjectNode();
-               initialKey.put("index", 4);
-               byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
-
-               ObjectNode initialValue = mapper.createObjectNode();
-               initialValue.put("word", "world");
-               byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-               JSONKeyValueDeserializationSchema schema = new 
JSONKeyValueDeserializationSchema(true);
-               ObjectNode deserializedValue = 
schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4);
-
-               Assert.assertEquals(4, 
deserializedValue.get("key").get("index").asInt());
-               Assert.assertEquals("world", 
deserializedValue.get("value").get("word").asText());
-               Assert.assertEquals("topic#1", 
deserializedValue.get("metadata").get("topic").asText());
-               Assert.assertEquals(4, 
deserializedValue.get("metadata").get("offset").asInt());
-               Assert.assertEquals(3, 
deserializedValue.get("metadata").get("partition").asInt());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
deleted file mode 100644
index 68225e2..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class JsonRowDeserializationSchemaTest {
-
-       /**
-        * Tests simple deserialization.
-        */
-       @Test
-       public void testDeserialization() throws Exception {
-               long id = 1238123899121L;
-               String name = "asdlkjasjkdla998y1122";
-               byte[] bytes = new byte[1024];
-               ThreadLocalRandom.current().nextBytes(bytes);
-
-               ObjectMapper objectMapper = new ObjectMapper();
-
-               // Root
-               ObjectNode root = objectMapper.createObjectNode();
-               root.put("id", id);
-               root.put("name", name);
-               root.put("bytes", bytes);
-
-               byte[] serializedJson = objectMapper.writeValueAsBytes(root);
-
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
-                               new String[] { "id", "name", "bytes" },
-                               new Class<?>[] { Long.class, String.class, 
byte[].class });
-
-               Row deserialized = 
deserializationSchema.deserialize(serializedJson);
-
-               assertEquals(3, deserialized.productArity());
-               assertEquals(id, deserialized.productElement(0));
-               assertEquals(name, deserialized.productElement(1));
-               assertArrayEquals(bytes, (byte[]) 
deserialized.productElement(2));
-       }
-
-       /**
-        * Tests deserialization with non-existing field name.
-        */
-       @Test
-       public void testMissingNode() throws Exception {
-               ObjectMapper objectMapper = new ObjectMapper();
-
-               // Root
-               ObjectNode root = objectMapper.createObjectNode();
-               root.put("id", 123123123);
-               byte[] serializedJson = objectMapper.writeValueAsBytes(root);
-
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
-                               new String[] { "name" },
-                               new Class<?>[] { String.class });
-
-               Row row = deserializationSchema.deserialize(serializedJson);
-
-               assertEquals(1, row.productArity());
-               assertNull("Missing field not null", row.productElement(0));
-
-               deserializationSchema.setFailOnMissingField(true);
-
-               try {
-                       deserializationSchema.deserialize(serializedJson);
-                       fail("Did not throw expected Exception");
-               } catch (IOException e) {
-                       assertTrue(e.getCause() instanceof 
IllegalStateException);
-               }
-       }
-
-       /**
-        * Tests that number of field names and types has to match.
-        */
-       @Test
-       public void testNumberOfFieldNamesAndTypesMismatch() throws Exception {
-               try {
-                       new JsonRowDeserializationSchema(
-                                       new String[] { "one", "two", "three" },
-                                       new Class<?>[] { Long.class });
-                       fail("Did not throw expected Exception");
-               } catch (IllegalArgumentException ignored) {
-                       // Expected
-               }
-
-               try {
-                       new JsonRowDeserializationSchema(
-                                       new String[] { "one" },
-                                       new Class<?>[] { Long.class, 
String.class });
-                       fail("Did not throw expected Exception");
-               } catch (IllegalArgumentException ignored) {
-                       // Expected
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
deleted file mode 100644
index 92af15d..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-public class JsonRowSerializationSchemaTest {
-       @Test
-       public void testRowSerialization() throws IOException {
-               String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
-               Row row = new Row(3);
-               row.setField(0, 1);
-               row.setField(1, true);
-               row.setField(2, "str");
-
-               Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, 
row);
-               assertEqualRows(row, resultRow);
-       }
-
-       @Test
-       public void testSerializationOfTwoRows() throws IOException {
-               String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
-               Row row1 = new Row(3);
-               row1.setField(0, 1);
-               row1.setField(1, true);
-               row1.setField(2, "str");
-
-               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
-
-               byte[] bytes = serializationSchema.serialize(row1);
-               assertEqualRows(row1, deserializationSchema.deserialize(bytes));
-
-               Row row2 = new Row(3);
-               row2.setField(0, 10);
-               row2.setField(1, false);
-               row2.setField(2, "newStr");
-
-               bytes = serializationSchema.serialize(row2);
-               assertEqualRows(row2, deserializationSchema.deserialize(bytes));
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testInputValidation() {
-               new JsonRowSerializationSchema(null);
-       }
-
-       @Test(expected = IllegalStateException.class)
-       public void testSerializeRowWithInvalidNumberOfFields() {
-               String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               Row row = new Row(1);
-               row.setField(0, 1);
-
-               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               serializationSchema.serialize(row);
-       }
-
-       private Row serializeAndDeserialize(String[] fieldNames, Class[] 
fieldTypes, Row row) throws IOException {
-               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
-
-               byte[] bytes = serializationSchema.serialize(row);
-               return deserializationSchema.deserialize(bytes);
-       }
-
-       private void assertEqualRows(Row expectedRow, Row resultRow) {
-               assertEquals("Deserialized row should have expected number of 
fields",
-                       expectedRow.productArity(), resultRow.productArity());
-               for (int i = 0; i < expectedRow.productArity(); i++) {
-                       assertEquals(String.format("Field number %d should be 
as in the original row", i),
-                               expectedRow.productElement(i), 
resultRow.productElement(i));
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 9beed22..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-       @Test
-       public void testPartitionsEqualConsumers() {
-               try {
-                       List<KafkaTopicPartition> inPartitions = Arrays.asList(
-                                       new KafkaTopicPartition("test-topic", 
4),
-                                       new KafkaTopicPartition("test-topic", 
52),
-                                       new KafkaTopicPartition("test-topic", 
17),
-                                       new KafkaTopicPartition("test-topic", 
1));
-
-                       for (int i = 0; i < inPartitions.size(); i++) {
-                               List<KafkaTopicPartition> parts = 
-                                               
FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
-
-                               assertNotNull(parts);
-                               assertEquals(1, parts.size());
-                               assertTrue(contains(inPartitions, 
parts.get(0).getPartition()));
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       private boolean contains(List<KafkaTopicPartition> inPartitions, int 
partition) {
-               for (KafkaTopicPartition ktp : inPartitions) {
-                       if (ktp.getPartition() == partition) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       @Test
-       public void testMultiplePartitionsPerConsumers() {
-               try {
-                       final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 
31, 127, 14};
-
-                       final List<KafkaTopicPartition> partitions = new 
ArrayList<>();
-                       final Set<KafkaTopicPartition> allPartitions = new 
HashSet<>();
-
-                       for (int p : partitionIDs) {
-                               KafkaTopicPartition part = new 
KafkaTopicPartition("test-topic", p);
-                               partitions.add(part);
-                               allPartitions.add(part);
-                       }
-
-                       final int numConsumers = 3;
-                       final int minPartitionsPerConsumer = partitions.size() 
/ numConsumers;
-                       final int maxPartitionsPerConsumer = partitions.size() 
/ numConsumers + 1;
-
-                       for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartition> parts = 
-                                               
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
-
-                               assertNotNull(parts);
-                               assertTrue(parts.size() >= 
minPartitionsPerConsumer);
-                               assertTrue(parts.size() <= 
maxPartitionsPerConsumer);
-
-                               for (KafkaTopicPartition p : parts) {
-                                       // check that the element was actually 
contained
-                                       assertTrue(allPartitions.remove(p));
-                               }
-                       }
-
-                       // all partitions must have been assigned
-                       assertTrue(allPartitions.isEmpty());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testPartitionsFewerThanConsumers() {
-               try {
-                       List<KafkaTopicPartition> inPartitions = Arrays.asList(
-                                       new KafkaTopicPartition("test-topic", 
4),
-                                       new KafkaTopicPartition("test-topic", 
52),
-                                       new KafkaTopicPartition("test-topic", 
17),
-                                       new KafkaTopicPartition("test-topic", 
1));
-
-                       final Set<KafkaTopicPartition> allPartitions = new 
HashSet<>();
-                       allPartitions.addAll(inPartitions);
-
-                       final int numConsumers = 2 * inPartitions.size() + 3;
-
-                       for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartition> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
-
-                               assertNotNull(parts);
-                               assertTrue(parts.size() <= 1);
-
-                               for (KafkaTopicPartition p : parts) {
-                                       // check that the element was actually 
contained
-                                       assertTrue(allPartitions.remove(p));
-                               }
-                       }
-
-                       // all partitions must have been assigned
-                       assertTrue(allPartitions.isEmpty());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testAssignEmptyPartitions() {
-               try {
-                       List<KafkaTopicPartition> ep = new ArrayList<>();
-                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
-                       assertNotNull(parts1);
-                       assertTrue(parts1.isEmpty());
-
-                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
-                       assertNotNull(parts2);
-                       assertTrue(parts2.isEmpty());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testGrowingPartitionsRemainsStable() {
-               try {
-                       final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 
42, 31, 127, 14};
-                       List<KafkaTopicPartition> newPartitions = new 
ArrayList<>();
-
-                       for (int p : newPartitionIDs) {
-                               KafkaTopicPartition part = new 
KafkaTopicPartition("test-topic", p);
-                               newPartitions.add(part);
-                       }
-
-                       List<KafkaTopicPartition> initialPartitions = 
newPartitions.subList(0, 7);
-
-                       final Set<KafkaTopicPartition> allNewPartitions = new 
HashSet<>(newPartitions);
-                       final Set<KafkaTopicPartition> allInitialPartitions = 
new HashSet<>(initialPartitions);
-
-                       final int numConsumers = 3;
-                       final int minInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers;
-                       final int maxInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers + 1;
-                       final int minNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers;
-                       final int maxNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers + 1;
-
-                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 0);
-                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 1);
-                       List<KafkaTopicPartition> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 2);
-
-                       assertNotNull(parts1);
-                       assertNotNull(parts2);
-                       assertNotNull(parts3);
-
-                       assertTrue(parts1.size() >= 
minInitialPartitionsPerConsumer);
-                       assertTrue(parts1.size() <= 
maxInitialPartitionsPerConsumer);
-                       assertTrue(parts2.size() >= 
minInitialPartitionsPerConsumer);
-                       assertTrue(parts2.size() <= 
maxInitialPartitionsPerConsumer);
-                       assertTrue(parts3.size() >= 
minInitialPartitionsPerConsumer);
-                       assertTrue(parts3.size() <= 
maxInitialPartitionsPerConsumer);
-
-                       for (KafkaTopicPartition p : parts1) {
-                               // check that the element was actually contained
-                               assertTrue(allInitialPartitions.remove(p));
-                       }
-                       for (KafkaTopicPartition p : parts2) {
-                               // check that the element was actually contained
-                               assertTrue(allInitialPartitions.remove(p));
-                       }
-                       for (KafkaTopicPartition p : parts3) {
-                               // check that the element was actually contained
-                               assertTrue(allInitialPartitions.remove(p));
-                       }
-
-                       // all partitions must have been assigned
-                       assertTrue(allInitialPartitions.isEmpty());
-
-                       // grow the set of partitions and distribute anew
-
-                       List<KafkaTopicPartition> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 0);
-                       List<KafkaTopicPartition> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 1);
-                       List<KafkaTopicPartition> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 2);
-
-                       // new partitions must include all old partitions
-
-                       assertTrue(parts1new.size() > parts1.size());
-                       assertTrue(parts2new.size() > parts2.size());
-                       assertTrue(parts3new.size() > parts3.size());
-
-                       assertTrue(parts1new.containsAll(parts1));
-                       assertTrue(parts2new.containsAll(parts2));
-                       assertTrue(parts3new.containsAll(parts3));
-
-                       assertTrue(parts1new.size() >= 
minNewPartitionsPerConsumer);
-                       assertTrue(parts1new.size() <= 
maxNewPartitionsPerConsumer);
-                       assertTrue(parts2new.size() >= 
minNewPartitionsPerConsumer);
-                       assertTrue(parts2new.size() <= 
maxNewPartitionsPerConsumer);
-                       assertTrue(parts3new.size() >= 
minNewPartitionsPerConsumer);
-                       assertTrue(parts3new.size() <= 
maxNewPartitionsPerConsumer);
-
-                       for (KafkaTopicPartition p : parts1new) {
-                               // check that the element was actually contained
-                               assertTrue(allNewPartitions.remove(p));
-                       }
-                       for (KafkaTopicPartition p : parts2new) {
-                               // check that the element was actually contained
-                               assertTrue(allNewPartitions.remove(p));
-                       }
-                       for (KafkaTopicPartition p : parts3new) {
-                               // check that the element was actually contained
-                               assertTrue(allNewPartitions.remove(p));
-                       }
-
-                       // all partitions must have been assigned
-                       assertTrue(allNewPartitions.isEmpty());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-}

Reply via email to