http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
deleted file mode 100644
index cf39606..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for all fetchers, which implement the connections to Kafka 
brokers and
- * pull records from Kafka partitions.
- * 
- * <p>This fetcher base class implements the logic around emitting records and 
tracking offsets,
- * as well as around the optional timestamp assignment and watermark 
generation. 
- * 
- * @param <T> The type of elements deserialized from Kafka's byte records, and 
emitted into
- *            the Flink data streams.
- * @param <KPH> The type of topic/partition identifier used by Kafka in the 
specific version.
- */
-public abstract class AbstractFetcher<T, KPH> {
-       
-       protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
-       protected static final int PERIODIC_WATERMARKS = 1;
-       protected static final int PUNCTUATED_WATERMARKS = 2;
-       
-       // 
------------------------------------------------------------------------
-       
-       /** The source context to emit records and watermarks to */
-       protected final SourceContext<T> sourceContext;
-
-       /** The lock that guarantees that record emission and state updates are 
atomic,
-        * from the view of taking a checkpoint */
-       protected final Object checkpointLock;
-
-       /** All partitions (and their state) that this fetcher is subscribed to 
*/
-       private final KafkaTopicPartitionState<KPH>[] allPartitions;
-
-       /** The mode describing whether the fetcher also generates timestamps 
and watermarks */
-       protected final int timestampWatermarkMode;
-
-       /** Flag whether to register metrics for the fetcher */
-       protected final boolean useMetrics;
-
-       /** Only relevant for punctuated watermarks: The current cross 
partition watermark */
-       private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
-
-       // 
------------------------------------------------------------------------
-       
-       protected AbstractFetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       ProcessingTimeService processingTimeProvider,
-                       long autoWatermarkInterval,
-                       ClassLoader userCodeClassLoader,
-                       boolean useMetrics) throws Exception
-       {
-               this.sourceContext = checkNotNull(sourceContext);
-               this.checkpointLock = sourceContext.getCheckpointLock();
-               this.useMetrics = useMetrics;
-               
-               // figure out what we watermark mode we will be using
-               
-               if (watermarksPeriodic == null) {
-                       if (watermarksPunctuated == null) {
-                               // simple case, no watermarks involved
-                               timestampWatermarkMode = 
NO_TIMESTAMPS_WATERMARKS;
-                       } else {
-                               timestampWatermarkMode = PUNCTUATED_WATERMARKS;
-                       }
-               } else {
-                       if (watermarksPunctuated == null) {
-                               timestampWatermarkMode = PERIODIC_WATERMARKS;
-                       } else {
-                               throw new IllegalArgumentException("Cannot have 
both periodic and punctuated watermarks");
-                       }
-               }
-               
-               // create our partition state according to the 
timestamp/watermark mode 
-               this.allPartitions = initializePartitions(
-                               assignedPartitions,
-                               timestampWatermarkMode,
-                               watermarksPeriodic, watermarksPunctuated,
-                               userCodeClassLoader);
-               
-               // if we have periodic watermarks, kick off the interval 
scheduler
-               if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] 
parts = 
-                                       
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
-                       
-                       PeriodicWatermarkEmitter periodicEmitter = 
-                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, processingTimeProvider, autoWatermarkInterval);
-                       periodicEmitter.start();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets all partitions (with partition state) that this fetcher is 
subscribed to.
-        *
-        * @return All subscribed partitions.
-        */
-       protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
-               return allPartitions;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Core fetcher work methods
-       // 
------------------------------------------------------------------------
-
-       public abstract void runFetchLoop() throws Exception;
-       
-       public abstract void cancel();
-
-       // 
------------------------------------------------------------------------
-       //  Kafka version specifics
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Creates the Kafka version specific representation of the given
-        * topic partition.
-        * 
-        * @param partition The Flink representation of the Kafka topic 
partition.
-        * @return The specific Kafka representation of the Kafka topic 
partition.
-        */
-       public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition 
partition);
-
-       /**
-        * Commits the given partition offsets to the Kafka brokers (or to 
ZooKeeper for
-        * older Kafka versions). The given offsets are the internal 
checkpointed offsets, representing
-        * the last processed record of each partition. Version-specific 
implementations of this method
-        * need to hold the contract that the given offsets must be incremented 
by 1 before
-        * committing them, so that committed offsets to Kafka represent "the 
next record to process".
-        * 
-        * @param offsets The offsets to commit to Kafka (implementations must 
increment offsets by 1 before committing).
-        * @throws Exception This method forwards exceptions.
-        */
-       public abstract void 
commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception;
-       
-       // 
------------------------------------------------------------------------
-       //  snapshot and restore the state
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Takes a snapshot of the partition offsets.
-        * 
-        * <p>Important: This method mus be called under the checkpoint lock.
-        * 
-        * @return A map from partition to current offset.
-        */
-       public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
-               // this method assumes that the checkpoint lock is held
-               assert Thread.holdsLock(checkpointLock);
-
-               HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>(allPartitions.length);
-               for (KafkaTopicPartitionState<?> partition : 
subscribedPartitions()) {
-                       state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
-               }
-               return state;
-       }
-
-       /**
-        * Restores the partition offsets.
-        * 
-        * @param snapshotState The offsets for the partitions 
-        */
-       public void restoreOffsets(HashMap<KafkaTopicPartition, Long> 
snapshotState) {
-               for (KafkaTopicPartitionState<?> partition : allPartitions) {
-                       Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
-                       if (offset != null) {
-                               partition.setOffset(offset);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  emitting records
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Emits a record without attaching an existing timestamp to it.
-        * 
-        * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
-        * That makes the fast path efficient, the extended paths are called as 
separate methods.
-        * 
-        * @param record The record to emit
-        * @param partitionState The state of the Kafka partition from which 
the record was fetched
-        * @param offset The offset of the record
-        */
-       protected void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset) throws Exception {
-               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-                       // fast path logic, in case there are no watermarks
-
-                       // emit the record, using the checkpoint lock to 
guarantee
-                       // atomicity of record emission and offset state update
-                       synchronized (checkpointLock) {
-                               sourceContext.collect(record);
-                               partitionState.setOffset(offset);
-                       }
-               }
-               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, Long.MIN_VALUE);
-               }
-               else {
-                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, Long.MIN_VALUE);
-               }
-       }
-
-       /**
-        * Emits a record attaching a timestamp to it.
-        *
-        * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
-        * That makes the fast path efficient, the extended paths are called as 
separate methods.
-        *
-        * @param record The record to emit
-        * @param partitionState The state of the Kafka partition from which 
the record was fetched
-        * @param offset The offset of the record
-        */
-       protected void emitRecordWithTimestamp(
-                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long timestamp) throws Exception {
-
-               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-                       // fast path logic, in case there are no watermarks 
generated in the fetcher
-
-                       // emit the record, using the checkpoint lock to 
guarantee
-                       // atomicity of record emission and offset state update
-                       synchronized (checkpointLock) {
-                               sourceContext.collectWithTimestamp(record, 
timestamp);
-                               partitionState.setOffset(offset);
-                       }
-               }
-               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, timestamp);
-               }
-               else {
-                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, timestamp);
-               }
-       }
-
-       /**
-        * Record emission, if a timestamp will be attached from an assigner 
that is
-        * also a periodic watermark generator.
-        */
-       protected void emitRecordWithTimestampAndPeriodicWatermark(
-                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long kafkaEventTimestamp)
-       {
-               @SuppressWarnings("unchecked")
-               final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
-
-               // extract timestamp - this accesses/modifies the per-partition 
state inside the
-               // watermark generator instance, so we need to lock the access 
on the
-               // partition state. concurrent access can happen from the 
periodic emitter
-               final long timestamp;
-               //noinspection SynchronizationOnLocalVariableOrMethodParameter
-               synchronized (withWatermarksState) {
-                       timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-               }
-
-               // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
-               // atomicity of record emission and offset state update 
-               synchronized (checkpointLock) {
-                       sourceContext.collectWithTimestamp(record, timestamp);
-                       partitionState.setOffset(offset);
-               }
-       }
-
-       /**
-        * Record emission, if a timestamp will be attached from an assigner 
that is
-        * also a punctuated watermark generator.
-        */
-       protected void emitRecordWithTimestampAndPunctuatedWatermark(
-                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long kafkaEventTimestamp)
-       {
-               @SuppressWarnings("unchecked")
-               final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
-                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
-
-               // only one thread ever works on accessing timestamps and 
watermarks
-               // from the punctuated extractor
-               final long timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
-               final Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
-
-               // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
-               // atomicity of record emission and offset state update 
-               synchronized (checkpointLock) {
-                       sourceContext.collectWithTimestamp(record, timestamp);
-                       partitionState.setOffset(offset);
-               }
-
-               // if we also have a new per-partition watermark, check if that 
is also a
-               // new cross-partition watermark
-               if (newWatermark != null) {
-                       updateMinPunctuatedWatermark(newWatermark);
-               }
-       }
-
-       /**
-        *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
-        */
-       private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
-               if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
-                       long newMin = Long.MAX_VALUE;
-
-                       for (KafkaTopicPartitionState<?> state : allPartitions) 
{
-                               @SuppressWarnings("unchecked")
-                               final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
-                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
-                               
-                               newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
-                       }
-
-                       // double-check locking pattern
-                       if (newMin > maxWatermarkSoFar) {
-                               synchronized (checkpointLock) {
-                                       if (newMin > maxWatermarkSoFar) {
-                                               maxWatermarkSoFar = newMin;
-                                               sourceContext.emitWatermark(new 
Watermark(newMin));
-                                       }
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Utility method that takes the topic partitions and creates the topic 
partition state
-        * holders. If a watermark generator per partition exists, this will 
also initialize those.
-        */
-       private KafkaTopicPartitionState<KPH>[] initializePartitions(
-                       List<KafkaTopicPartition> assignedPartitions,
-                       int timestampWatermarkMode,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       ClassLoader userCodeClassLoader)
-               throws IOException, ClassNotFoundException
-       {
-               switch (timestampWatermarkMode) {
-                       
-                       case NO_TIMESTAMPS_WATERMARKS: {
-                               @SuppressWarnings("unchecked")
-                               KafkaTopicPartitionState<KPH>[] partitions =
-                                               
(KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
-                               int pos = 0;
-                               for (KafkaTopicPartition partition : 
assignedPartitions) {
-                                       // create the kafka version specific 
partition handle
-                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
-                                       partitions[pos++] = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
-                               }
-
-                               return partitions;
-                       }
-
-                       case PERIODIC_WATERMARKS: {
-                               @SuppressWarnings("unchecked")
-                               
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
-                                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
-                                                               new 
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
-
-                               int pos = 0;
-                               for (KafkaTopicPartition partition : 
assignedPartitions) {
-                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
-
-                                       AssignerWithPeriodicWatermarks<T> 
assignerInstance =
-                                                       
watermarksPeriodic.deserializeValue(userCodeClassLoader);
-                                       
-                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
-                                                       partition, kafkaHandle, 
assignerInstance);
-                               }
-
-                               return partitions;
-                       }
-
-                       case PUNCTUATED_WATERMARKS: {
-                               @SuppressWarnings("unchecked")
-                               
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
-                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
-                                                               new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<?, 
?>[assignedPartitions.size()];
-
-                               int pos = 0;
-                               for (KafkaTopicPartition partition : 
assignedPartitions) {
-                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
-
-                                       AssignerWithPunctuatedWatermarks<T> 
assignerInstance =
-                                                       
watermarksPunctuated.deserializeValue(userCodeClassLoader);
-
-                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
-                                                       partition, kafkaHandle, 
assignerInstance);
-                               }
-
-                               return partitions;
-                       }
-                       default:
-                               // cannot happen, add this as a guard for the 
future
-                               throw new RuntimeException();
-               }
-       }
-
-       // ------------------------- Metrics ----------------------------------
-
-       /**
-        * Add current and committed offsets to metric group
-        *
-        * @param metricGroup The metric group to use
-        */
-       protected void addOffsetStateGauge(MetricGroup metricGroup) {
-               // add current offsets to gage
-               MetricGroup currentOffsets = 
metricGroup.addGroup("current-offsets");
-               MetricGroup committedOffsets = 
metricGroup.addGroup("committed-offsets");
-               for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
-                       currentOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
-                       committedOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
-               }
-       }
-
-       /**
-        * Gauge types
-        */
-       private enum OffsetGaugeType {
-               CURRENT_OFFSET,
-               COMMITTED_OFFSET
-       }
-
-       /**
-        * Gauge for getting the offset of a KafkaTopicPartitionState.
-        */
-       private static class OffsetGauge implements Gauge<Long> {
-
-               private final KafkaTopicPartitionState<?> ktp;
-               private final OffsetGaugeType gaugeType;
-
-               public OffsetGauge(KafkaTopicPartitionState<?> ktp, 
OffsetGaugeType gaugeType) {
-                       this.ktp = ktp;
-                       this.gaugeType = gaugeType;
-               }
-
-               @Override
-               public Long getValue() {
-                       switch(gaugeType) {
-                               case COMMITTED_OFFSET:
-                                       return ktp.getCommittedOffset();
-                               case CURRENT_OFFSET:
-                                       return ktp.getOffset();
-                               default:
-                                       throw new RuntimeException("Unknown 
gauge type: " + gaugeType);
-                       }
-               }
-       }
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * The periodic watermark emitter. In its given interval, it checks all 
partitions for
-        * the current event time watermark, and possibly emits the next 
watermark.
-        */
-       private static class PeriodicWatermarkEmitter implements 
ProcessingTimeCallback {
-
-               private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?>[] allPartitions;
-               
-               private final SourceContext<?> emitter;
-               
-               private final ProcessingTimeService timerService;
-
-               private final long interval;
-               
-               private long lastWatermarkTimestamp;
-               
-               //-------------------------------------------------
-
-               PeriodicWatermarkEmitter(
-                               
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
-                               SourceContext<?> emitter,
-                               ProcessingTimeService timerService,
-                               long autoWatermarkInterval)
-               {
-                       this.allPartitions = checkNotNull(allPartitions);
-                       this.emitter = checkNotNull(emitter);
-                       this.timerService = checkNotNull(timerService);
-                       this.interval = autoWatermarkInterval;
-                       this.lastWatermarkTimestamp = Long.MIN_VALUE;
-               }
-
-               //-------------------------------------------------
-               
-               public void start() {
-                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
-               }
-               
-               @Override
-               public void onProcessingTime(long timestamp) throws Exception {
-
-                       long minAcrossAll = Long.MAX_VALUE;
-                       for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?> state : allPartitions) {
-                               
-                               // we access the current watermark for the 
periodic assigners under the state
-                               // lock, to prevent concurrent modification to 
any internal variables
-                               final long curr;
-                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                               synchronized (state) {
-                                       curr = 
state.getCurrentWatermarkTimestamp();
-                               }
-                               
-                               minAcrossAll = Math.min(minAcrossAll, curr);
-                       }
-                       
-                       // emit next watermark, if there is one
-                       if (minAcrossAll > lastWatermarkTimestamp) {
-                               lastWatermarkTimestamp = minAcrossAll;
-                               emitter.emitWatermark(new 
Watermark(minAcrossAll));
-                       }
-                       
-                       // schedule the next watermark
-                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
deleted file mode 100644
index c736493..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import javax.annotation.Nullable;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A proxy that communicates exceptions between threads. Typically used if an 
exception
- * from a spawned thread needs to be recognized by the "parent" (spawner) 
thread.
- * 
- * <p>The spawned thread would set the exception via {@link 
#reportError(Throwable)}.
- * The parent would check (at certain points) for exceptions via {@link 
#checkAndThrowException()}.
- * Optionally, the parent can pass itself in the constructor to be interrupted 
as soon as
- * an exception occurs.
- * 
- * <pre>
- * {@code
- * 
- * final ExceptionProxy errorProxy = new 
ExceptionProxy(Thread.currentThread());
- * 
- * Thread subThread = new Thread() {
- * 
- *     public void run() {
- *         try {
- *             doSomething();
- *         } catch (Throwable t) {
- *             errorProxy.reportError(
- *         } finally {
- *             doSomeCleanup();
- *         }
- *     }
- * };
- * subThread.start();
- * 
- * doSomethingElse();
- * errorProxy.checkAndThrowException();
- * 
- * doSomethingMore();
- * errorProxy.checkAndThrowException();
- * 
- * try {
- *     subThread.join();
- * } catch (InterruptedException e) {
- *     errorProxy.checkAndThrowException();
- *     // restore interrupted status, if not caused by an exception
- *     Thread.currentThread().interrupt();
- * }
- * }
- * </pre>
- */
-public class ExceptionProxy {
-       
-       /** The thread that should be interrupted when an exception occurs */
-       private final Thread toInterrupt;
-       
-       /** The exception to throw */ 
-       private final AtomicReference<Throwable> exception;
-
-       /**
-        * Creates an exception proxy that interrupts the given thread upon
-        * report of an exception. The thread to interrupt may be null.
-        * 
-        * @param toInterrupt The thread to interrupt upon an exception. May be 
null.
-        */
-       public ExceptionProxy(@Nullable Thread toInterrupt) {
-               this.toInterrupt = toInterrupt;
-               this.exception = new AtomicReference<>();
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Sets the exception and interrupts the target thread,
-        * if no other exception has occurred so far.
-        * 
-        * <p>The exception is only set (and the interruption is only 
triggered),
-        * if no other exception was set before.
-        * 
-        * @param t The exception that occurred
-        */
-       public void reportError(Throwable t) {
-               // set the exception, if it is the first (and the exception is 
non null)
-               if (t != null && exception.compareAndSet(null, t) && 
toInterrupt != null) {
-                       toInterrupt.interrupt();
-               }
-       }
-
-       /**
-        * Checks whether an exception has been set via {@link 
#reportError(Throwable)}.
-        * If yes, that exception if re-thrown by this method.
-        * 
-        * @throws Exception This method re-throws the exception, if set.
-        */
-       public void checkAndThrowException() throws Exception {
-               Throwable t = exception.get();
-               if (t != null) {
-                       if (t instanceof Exception) {
-                               throw (Exception) t;
-                       }
-                       else if (t instanceof Error) {
-                               throw (Error) t;
-                       }
-                       else {
-                               throw new Exception(t);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
deleted file mode 100644
index c68fe28..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Flink's description of a partition in a Kafka topic.
- * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, 
...)
- * 
- * <p>Note: This class must not change in its structure, because it would 
change the
- * serialization format and make previous savepoints unreadable.
- */
-public final class KafkaTopicPartition implements Serializable {
-
-       /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
-        * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
-       private static final long serialVersionUID = 722083576322742325L;
-       
-       // 
------------------------------------------------------------------------
-
-       private final String topic;
-       private final int partition;
-       private final int cachedHash;
-
-       public KafkaTopicPartition(String topic, int partition) {
-               this.topic = requireNonNull(topic);
-               this.partition = partition;
-               this.cachedHash = 31 * topic.hashCode() + partition;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public String getTopic() {
-               return topic;
-       }
-
-       public int getPartition() {
-               return partition;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "KafkaTopicPartition{" +
-                               "topic='" + topic + '\'' +
-                               ", partition=" + partition +
-                               '}';
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               else if (o instanceof KafkaTopicPartition) {
-                       KafkaTopicPartition that = (KafkaTopicPartition) o;
-                       return this.partition == that.partition && 
this.topic.equals(that.topic);
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public int hashCode() {
-               return cachedHash;
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       public static String toString(Map<KafkaTopicPartition, Long> map) {
-               StringBuilder sb = new StringBuilder();
-               for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
-                       KafkaTopicPartition ktp = p.getKey();
-                       
sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(",
 ");
-               }
-               return sb.toString();
-       }
-
-       public static String toString(List<KafkaTopicPartition> partitions) {
-               StringBuilder sb = new StringBuilder();
-               for (KafkaTopicPartition p: partitions) {
-                       
sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
-               }
-               return sb.toString();
-       }
-
-
-       public static List<KafkaTopicPartition> 
dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
-               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitionInfos.size());
-               for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
-                       ret.add(ktpl.getTopicPartition());
-               }
-               return ret;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
deleted file mode 100644
index 1959a05..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.kafka.common.Node;
-
-import java.io.Serializable;
-
-/**
- * Serializable Topic Partition info with leader Node information.
- * This class is used at runtime.
- */
-public class KafkaTopicPartitionLeader implements Serializable {
-
-       private static final long serialVersionUID = 9145855900303748582L;
-
-       private final int leaderId;
-       private final int leaderPort;
-       private final String leaderHost;
-       private final KafkaTopicPartition topicPartition;
-       private final int cachedHash;
-
-       public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, 
Node leader) {
-               this.topicPartition = topicPartition;
-               if (leader == null) {
-                       this.leaderId = -1;
-                       this.leaderHost = null;
-                       this.leaderPort = -1;
-               } else {
-                       this.leaderId = leader.id();
-                       this.leaderPort = leader.port();
-                       this.leaderHost = leader.host();
-               }
-               int cachedHash = (leader == null) ? 14 : leader.hashCode();
-               this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
-       }
-
-       public KafkaTopicPartition getTopicPartition() {
-               return topicPartition;
-       }
-
-       public Node getLeader() {
-               if (this.leaderId == -1) {
-                       return null;
-               } else {
-                       return new Node(leaderId, leaderHost, leaderPort);
-               }
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (!(o instanceof KafkaTopicPartitionLeader)) {
-                       return false;
-               }
-
-               KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
-
-               if (!topicPartition.equals(that.topicPartition)) {
-                       return false;
-               }
-               return leaderId == that.leaderId && leaderPort == 
that.leaderPort && leaderHost.equals(that.leaderHost);
-       }
-
-       @Override
-       public int hashCode() {
-               return cachedHash;
-       }
-
-       @Override
-       public String toString() {
-               return "KafkaTopicPartitionLeader{" +
-                               "leaderId=" + leaderId +
-                               ", leaderPort=" + leaderPort +
-                               ", leaderHost='" + leaderHost + '\'' +
-                               ", topic=" + topicPartition.getTopic() +
-                               ", partition=" + topicPartition.getPartition() +
-                               '}';
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
deleted file mode 100644
index 7cb5f46..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-/**
- * The state that the Flink Kafka Consumer holds for each Kafka partition.
- * Includes the Kafka descriptor for partitions.
- * 
- * <p>This class describes the most basic state (only the offset), subclasses
- * define more elaborate state, containing current watermarks and timestamp
- * extractors.
- * 
- * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
- */
-public class KafkaTopicPartitionState<KPH> {
-
-       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
-        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
-       public static final long OFFSET_NOT_SET = -915623761776L;
-       
-       // 
------------------------------------------------------------------------
-
-       /** The Flink description of a Kafka partition */
-       private final KafkaTopicPartition partition;
-
-       /** The Kafka description of a Kafka partition (varies across different 
Kafka versions) */
-       private final KPH kafkaPartitionHandle;
-       
-       /** The offset within the Kafka partition that we already processed */
-       private volatile long offset;
-
-       /** The offset of the Kafka partition that has been committed */
-       private volatile long committedOffset;
-
-       // 
------------------------------------------------------------------------
-       
-       public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH 
kafkaPartitionHandle) {
-               this.partition = partition;
-               this.kafkaPartitionHandle = kafkaPartitionHandle;
-               this.offset = OFFSET_NOT_SET;
-               this.committedOffset = OFFSET_NOT_SET;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets Flink's descriptor for the Kafka Partition.
-        * @return The Flink partition descriptor.
-        */
-       public final KafkaTopicPartition getKafkaTopicPartition() {
-               return partition;
-       }
-
-       /**
-        * Gets Kafka's descriptor for the Kafka Partition.
-        * @return The Kafka partition descriptor.
-        */
-       public final KPH getKafkaPartitionHandle() {
-               return kafkaPartitionHandle;
-       }
-
-       public final String getTopic() {
-               return partition.getTopic();
-       }
-
-       public final int getPartition() {
-               return partition.getPartition();
-       }
-
-       /**
-        * The current offset in the partition. This refers to the offset last 
element that
-        * we retrieved and emitted successfully. It is the offset that should 
be stored in
-        * a checkpoint.
-        */
-       public final long getOffset() {
-               return offset;
-       }
-
-       public final void setOffset(long offset) {
-               this.offset = offset;
-       }
-
-       public final boolean isOffsetDefined() {
-               return offset != OFFSET_NOT_SET;
-       }
-
-       public final void setCommittedOffset(long offset) {
-               this.committedOffset = offset;
-       }
-
-       public final long getCommittedOffset() {
-               return committedOffset;
-       }
-
-       
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "Partition: " + partition + ", KafkaPartitionHandle=" + 
kafkaPartitionHandle
-                               + ", offset=" + (isOffsetDefined() ? 
String.valueOf(offset) : "(not set)");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
deleted file mode 100644
index efdc73f..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-/**
- * A special version of the per-kafka-partition-state that additionally holds
- * a periodic watermark generator (and timestamp extractor) per partition.
- * 
- * @param <T> The type of records handled by the watermark generator
- * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
- */
-public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
extends KafkaTopicPartitionState<KPH> {
-       
-       /** The timestamp assigner and watermark generator for the partition */
-       private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
-       
-       /** The last watermark timestamp generated by this partition */
-       private long partitionWatermark;
-
-       // 
------------------------------------------------------------------------
-       
-       public KafkaTopicPartitionStateWithPeriodicWatermarks(
-                       KafkaTopicPartition partition, KPH kafkaPartitionHandle,
-                       AssignerWithPeriodicWatermarks<T> 
timestampsAndWatermarks)
-       {
-               super(partition, kafkaPartitionHandle);
-               
-               this.timestampsAndWatermarks = timestampsAndWatermarks;
-               this.partitionWatermark = Long.MIN_VALUE;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
-               return timestampsAndWatermarks.extractTimestamp(record, 
kafkaEventTimestamp);
-       }
-       
-       public long getCurrentWatermarkTimestamp() {
-               Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
-               if (wm != null) {
-                       partitionWatermark = Math.max(partitionWatermark, 
wm.getTimestamp());
-               }
-               return partitionWatermark;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "KafkaTopicPartitionStateWithPeriodicWatermarks: 
partition=" + getKafkaTopicPartition()
-                               + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
deleted file mode 100644
index edf40ce..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import javax.annotation.Nullable;
-
-/**
- * A special version of the per-kafka-partition-state that additionally holds
- * a periodic watermark generator (and timestamp extractor) per partition.
- * 
- * <p>This class is not thread safe, but it gives volatile access to the 
current
- * partition watermark ({@link #getCurrentPartitionWatermark()}).
- * 
- * @param <T> The type of records handled by the watermark generator
- * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions
- */
-public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
extends KafkaTopicPartitionState<KPH> {
-       
-       /** The timestamp assigner and watermark generator for the partition */
-       private final AssignerWithPunctuatedWatermarks<T> 
timestampsAndWatermarks;
-       
-       /** The last watermark timestamp generated by this partition */
-       private volatile long partitionWatermark;
-
-       // 
------------------------------------------------------------------------
-       
-       public KafkaTopicPartitionStateWithPunctuatedWatermarks(
-                       KafkaTopicPartition partition, KPH kafkaPartitionHandle,
-                       AssignerWithPunctuatedWatermarks<T> 
timestampsAndWatermarks)
-       {
-               super(partition, kafkaPartitionHandle);
-               
-               this.timestampsAndWatermarks = timestampsAndWatermarks;
-               this.partitionWatermark = Long.MIN_VALUE;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
-               return timestampsAndWatermarks.extractTimestamp(record, 
kafkaEventTimestamp);
-       }
-
-       @Nullable
-       public Watermark checkAndGetNewWatermark(T record, long timestamp) {
-               Watermark mark = 
timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
-               if (mark != null && mark.getTimestamp() > partitionWatermark) {
-                       partitionWatermark = mark.getTimestamp();
-                       return mark;
-               }
-               else {
-                       return null;
-               }
-       }
-       
-       public long getCurrentPartitionWatermark() {
-               return partitionWatermark;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public String toString() {
-               return "KafkaTopicPartitionStateWithPunctuatedWatermarks: 
partition=" + getKafkaTopicPartition()
-                               + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
deleted file mode 100644
index 7a41ade..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class TypeUtil {
-       private TypeUtil() {}
-
-       /**
-        * Creates TypeInformation array for an array of Classes.
-        * @param fieldTypes classes to extract type information from
-        * @return type information
-        */
-       public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
-               TypeInformation<?>[] typeInfos = new 
TypeInformation[fieldTypes.length];
-               for (int i = 0; i < fieldTypes.length; i++) {
-                       typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
-               }
-               return typeInfos;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
deleted file mode 100644
index cedb696..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals.metrics;
-
-import org.apache.flink.metrics.Gauge;
-
-/**
- * Gauge for getting the current value of a Kafka metric.
- */
-public class KafkaMetricWrapper implements Gauge<Double> {
-       private final org.apache.kafka.common.Metric kafkaMetric;
-
-       public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
-               this.kafkaMetric = metric;
-       }
-
-       @Override
-       public Double getValue() {
-               return kafkaMetric.value();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index 9b848e0..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one 
Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- *     # More Flink partitions than kafka partitions
- * <pre>
- *             Flink Sinks:            Kafka Partitions
- *                     1       ----------------&gt;    1
- *                     2   --------------/
- *                     3   -------------/
- *                     4       ------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink 
partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- *             Flink Sinks:            Kafka Partitions
- *                     1       ----------------&gt;    1
- *                     2       ----------------&gt;    2
- *                                                                             
3
- *                                                                             
4
- *                                                                             
5
- * </pre>
- *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
- *
- *
- */
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements 
Serializable {
-       private static final long serialVersionUID = 1627268846962918126L;
-
-       private int targetPartition = -1;
-
-       @Override
-       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
-               if (parallelInstanceId < 0 || parallelInstances <= 0 || 
partitions.length == 0) {
-                       throw new IllegalArgumentException();
-               }
-               
-               this.targetPartition = partitions[parallelInstanceId % 
partitions.length];
-       }
-
-       @Override
-       public int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
-               if (targetPartition >= 0) {
-                       return targetPartition;
-               } else {
-                       throw new RuntimeException("The partitioner has not 
been initialized properly");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
deleted file mode 100644
index 37e2ef6..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
- */
-public abstract class KafkaPartitioner<T> implements Serializable {
-
-       private static final long serialVersionUID = -1974260817778593473L;
-
-       /**
-        * Initializer for the Partitioner.
-        * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
-        * @param parallelInstances the total number of parallel instances
-        * @param partitions an array describing the partition IDs of the 
available Kafka partitions.
-        */
-       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
-               // overwrite this method if needed.
-       }
-
-       public abstract int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
deleted file mode 100644
index d170058..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.io.IOException;
-
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
- */
-public class JSONDeserializationSchema extends 
AbstractDeserializationSchema<ObjectNode> {
-       private ObjectMapper mapper;
-
-       @Override
-       public ObjectNode deserialize(byte[] message) throws IOException {
-               if (mapper == null) {
-                       mapper = new ObjectMapper();
-               }
-               return mapper.readValue(message, ObjectNode.class);
-       }
-
-       @Override
-       public boolean isEndOfStream(ObjectNode nextElement) {
-               return false;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
deleted file mode 100644
index 261a111..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
-
-/**
- * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Key fields can be accessed by calling 
objectNode.get("key").get(&lt;name>).as(&lt;type>)
- * <p>
- * Value fields can be accessed by calling 
objectNode.get("value").get(&lt;name>).as(&lt;type>)
- * <p>
- * Metadata fields can be accessed by calling 
objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
- * the "offset" (long), "topic" (String) and "partition" (int).
- */
-public class JSONKeyValueDeserializationSchema implements 
KeyedDeserializationSchema<ObjectNode> {
-       private final boolean includeMetadata;
-       private ObjectMapper mapper;
-
-       public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
-               this.includeMetadata = includeMetadata;
-       }
-
-       @Override
-       public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {
-               if (mapper == null) {
-                       mapper = new ObjectMapper();
-               }
-               ObjectNode node = mapper.createObjectNode();
-               node.set("key", mapper.readValue(messageKey, JsonNode.class));
-               node.set("value", mapper.readValue(message, JsonNode.class));
-               if (includeMetadata) {
-                       node.putObject("metadata")
-                               .put("offset", offset)
-                               .put("topic", topic)
-                               .put("partition", partition);
-               }
-               return node;
-       }
-
-       @Override
-       public boolean isEndOfStream(ObjectNode nextElement) {
-               return false;
-       }
-
-       @Override
-       public TypeInformation<ObjectNode> getProducedType() {
-               return getForClass(ObjectNode.class);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
deleted file mode 100644
index 4344810..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Deserialization schema from JSON to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
- * the specified fields.
- *
- * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
- */
-public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row> {
-
-       /** Field names to parse. Indices match fieldTypes indices. */
-       private final String[] fieldNames;
-
-       /** Types to parse fields as. Indices match fieldNames indices. */
-       private final TypeInformation<?>[] fieldTypes;
-
-       /** Object mapper for parsing the JSON. */
-       private final ObjectMapper objectMapper = new ObjectMapper();
-
-       /** Flag indicating whether to fail on a missing field. */
-       private boolean failOnMissingField;
-
-       /**
-        * Creates a JSON deserialization schema for the given fields and type 
classes.
-        *
-        * @param fieldNames Names of JSON fields to parse.
-        * @param fieldTypes Type classes to parse JSON fields as.
-        */
-       public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] 
fieldTypes) {
-               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
-
-               this.fieldTypes = new TypeInformation[fieldTypes.length];
-               for (int i = 0; i < fieldTypes.length; i++) {
-                       this.fieldTypes[i] = 
TypeExtractor.getForClass(fieldTypes[i]);
-               }
-
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                               "Number of provided field names and types does 
not match.");
-       }
-
-       /**
-        * Creates a JSON deserialization schema for the given fields and types.
-        *
-        * @param fieldNames Names of JSON fields to parse.
-        * @param fieldTypes Types to parse JSON fields as.
-        */
-       public JsonRowDeserializationSchema(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
-               this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field 
types");
-
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                               "Number of provided field names and types does 
not match.");
-       }
-
-       @Override
-       public Row deserialize(byte[] message) throws IOException {
-               try {
-                       JsonNode root = objectMapper.readTree(message);
-
-                       Row row = new Row(fieldNames.length);
-                       for (int i = 0; i < fieldNames.length; i++) {
-                               JsonNode node = root.get(fieldNames[i]);
-
-                               if (node == null) {
-                                       if (failOnMissingField) {
-                                               throw new 
IllegalStateException("Failed to find field with name '"
-                                                               + fieldNames[i] 
+ "'.");
-                                       } else {
-                                               row.setField(i, null);
-                                       }
-                               } else {
-                                       // Read the value as specified type
-                                       Object value = 
objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
-                                       row.setField(i, value);
-                               }
-                       }
-
-                       return row;
-               } catch (Throwable t) {
-                       throw new IOException("Failed to deserialize JSON 
object.", t);
-               }
-       }
-
-       @Override
-       public boolean isEndOfStream(Row nextElement) {
-               return false;
-       }
-
-       @Override
-       public TypeInformation<Row> getProducedType() {
-               return new RowTypeInfo(fieldTypes);
-       }
-
-       /**
-        * Configures the failure behaviour if a JSON field is missing.
-        *
-        * <p>By default, a missing field is ignored and the field is set to 
null.
-        *
-        * @param failOnMissingField Flag indicating whether to fail or not on 
a missing field.
-        */
-       public void setFailOnMissingField(boolean failOnMissingField) {
-               this.failOnMissingField = failOnMissingField;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
deleted file mode 100644
index 077ff13..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.util.Preconditions;
-
-
-/**
- * Serialization schema that serializes an object into a JSON bytes.
- *
- * <p>Serializes the input {@link Row} object into a JSON string and
- * converts it into <code>byte[]</code>.
- *
- * <p>Result <code>byte[]</code> messages can be deserialized using
- * {@link JsonRowDeserializationSchema}.
- */
-public class JsonRowSerializationSchema implements SerializationSchema<Row> {
-       /** Fields names in the input Row object */
-       private final String[] fieldNames;
-       /** Object mapper that is used to create output JSON objects */
-       private static ObjectMapper mapper = new ObjectMapper();
-
-       /**
-        * Creates a JSON serialization schema for the given fields and types.
-        *
-        * @param fieldNames Names of JSON fields to parse.
-        */
-       public JsonRowSerializationSchema(String[] fieldNames) {
-               this.fieldNames = Preconditions.checkNotNull(fieldNames);
-       }
-
-       @Override
-       public byte[] serialize(Row row) {
-               if (row.productArity() != fieldNames.length) {
-                       throw new IllegalStateException(String.format(
-                               "Number of elements in the row %s is different 
from number of field names: %d", row, fieldNames.length));
-               }
-
-               ObjectNode objectNode = mapper.createObjectNode();
-
-               for (int i = 0; i < row.productArity(); i++) {
-                       JsonNode node = 
mapper.valueToTree(row.productElement(i));
-                       objectNode.set(fieldNames[i], node);
-               }
-
-               try {
-                       return mapper.writeValueAsBytes(objectNode);
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to serialize row", 
e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
deleted file mode 100644
index 01e72ca..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The deserialization schema describes how to turn the byte key / value 
messages delivered by certain
- * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
- * processed by Flink.
- * 
- * @param <T> The type created by the keyed deserialization schema.
- */
-public interface KeyedDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
-
-       /**
-        * Deserializes the byte message.
-        *
-        * @param messageKey the key as a byte array (null if no key has been 
set)
-        * @param message The message, as a byte array. (null if the message 
was empty or deleted)
-        * @param partition The partition the message has originated from
-        * @param offset the offset of the message in the original source (for 
example the Kafka offset)  @return The deserialized message as an object.
-        */
-       T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
-
-       /**
-        * Method to decide whether the element signals the end of the stream. 
If
-        * true is returned the element won't be emitted.
-        * 
-        * @param nextElement The element to test for the end-of-stream signal.
-        * @return True, if the element signals end of stream, false otherwise.
-        */
-       boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
deleted file mode 100644
index 4b9dba2..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.IOException;
-
-/**
- * A simple wrapper for using the DeserializationSchema with the 
KeyedDeserializationSchema
- * interface
- * @param <T> The type created by the deserialization schema.
- */
-public class KeyedDeserializationSchemaWrapper<T> implements 
KeyedDeserializationSchema<T> {
-
-       private static final long serialVersionUID = 2651665280744549932L;
-
-       private final DeserializationSchema<T> deserializationSchema;
-
-       public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> 
deserializationSchema) {
-               this.deserializationSchema = deserializationSchema;
-       }
-       @Override
-       public T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset) throws IOException {
-               return deserializationSchema.deserialize(message);
-       }
-
-       @Override
-       public boolean isEndOfStream(T nextElement) {
-               return deserializationSchema.isEndOfStream(nextElement);
-       }
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return deserializationSchema.getProducedType();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
deleted file mode 100644
index 701281e..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-/**
- * The serialization schema describes how to turn a data object into a 
different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data 
to be handed
- * to them in a specific format (for example as byte strings).
- * 
- * @param <T> The type to be serialized.
- */
-public interface KeyedSerializationSchema<T> extends Serializable {
-
-       /**
-        * Serializes the key of the incoming element to a byte array
-        * This method might return null if no key is available.
-        *
-        * @param element The incoming element to be serialized
-        * @return the key of the element as a byte array
-        */
-       byte[] serializeKey(T element);
-
-
-       /**
-        * Serializes the value of the incoming element to a byte array
-        * 
-        * @param element The incoming element to be serialized
-        * @return the value of the element as a byte array
-        */
-       byte[] serializeValue(T element);
-
-       /**
-        * Optional method to determine the target topic for the element
-        *
-        * @param element Incoming element to determine the target topic from
-        * @return null or the target topic
-        */
-       String getTargetTopic(T element);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
deleted file mode 100644
index 1b3e486..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util.serialization;
-
-/**
- * A simple wrapper for using the SerializationSchema with the 
KeyedDeserializationSchema
- * interface
- * @param <T> The type to serialize
- */
-public class KeyedSerializationSchemaWrapper<T> implements 
KeyedSerializationSchema<T> {
-
-       private static final long serialVersionUID = 1351665280744549933L;
-
-       private final SerializationSchema<T> serializationSchema;
-
-       public KeyedSerializationSchemaWrapper(SerializationSchema<T> 
serializationSchema) {
-               this.serializationSchema = serializationSchema;
-       }
-
-       @Override
-       public byte[] serializeKey(T element) {
-               return null;
-       }
-
-       @Override
-       public byte[] serializeValue(T element) {
-               return serializationSchema.serialize(element);
-       }
-
-       @Override
-       public String getTargetTopic(T element) {
-               return null; // we are never overriding the topic
-       }
-}

Reply via email to