http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml 
b/flink-connectors/flink-connector-kafka-base/pom.xml
new file mode 100644
index 0000000..58eb043
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -0,0 +1,212 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-kafka-base_2.10</artifactId>
+       <name>flink-connector-kafka-base</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <kafka.version>0.8.2.2</kafka.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project,
+                       won't depend on flink-table. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_${scala.binary.version}</artifactId>
+                       <version>${kafka.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.sun.jmx</groupId>
+                                       <artifactId>jmxri</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.sun.jdmk</groupId>
+                                       <artifactId>jmxtools</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-simple</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>net.sf.jopt-simple</groupId>
+                                       <artifactId>jopt-simple</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>scala-reflect</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>scala-compiler</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.yammer.metrics</groupId>
+                                       
<artifactId>metrics-annotation</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.xerial.snappy</groupId>
+                                       <artifactId>snappy-java</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- test dependencies -->
+               
+               <!-- force using the latest zkclient -->
+               <dependency>
+                       <groupId>com.101tec</groupId>
+                       <artifactId>zkclient</artifactId>
+                       <version>0.7</version>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.curator</groupId>
+                       <artifactId>curator-test</artifactId>
+                       <version>${curator.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-jmx</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>com.101tec</groupId>
+                               <artifactId>zkclient</artifactId>
+                               <version>0.7</version>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
+       
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <!--
+            https://issues.apache.org/jira/browse/DIRSHARED-134
+            Required to pull the Mini-KDC transitive dependency
+            -->
+                       <plugin>
+                               <groupId>org.apache.felix</groupId>
+                               <artifactId>maven-bundle-plugin</artifactId>
+                               <version>3.0.1</version>
+                               <inherited>true</inherited>
+                               <extensions>true</extensions>
+                       </plugin>
+               </plugins>
+       </build>
+       
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 0000000..aef7116
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class of all Flink Kafka Consumer data sources.
+ * This implements the common behavior across all Kafka versions.
+ * 
+ * <p>The Kafka version specific behavior is defined mainly in the specific 
subclasses of the
+ * {@link AbstractFetcher}.
+ * 
+ * @param <T> The type of records produced by this data source
+ */
+public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T> implements 
+               CheckpointListener,
+               ResultTypeQueryable<T>,
+               CheckpointedFunction {
+       private static final long serialVersionUID = -6272159445203409112L;
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+       
+       /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
+       public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+       /** Boolean configuration key to disable metrics tracking **/
+       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
+
+       // 
------------------------------------------------------------------------
+       //  configuration state, set on the client relevant for all subtasks
+       // 
------------------------------------------------------------------------
+
+       private final List<String> topics;
+       
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
+       protected final KeyedDeserializationSchema<T> deserializer;
+
+       /** The set of topic partitions that the source will read */
+       protected List<KafkaTopicPartition> subscribedPartitions;
+       
+       /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
+        * to exploit per-partition timestamp characteristics.
+        * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
+       private SerializedValue<AssignerWithPeriodicWatermarks<T>> 
periodicWatermarkAssigner;
+       
+       /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
+        * to exploit per-partition timestamp characteristics. 
+        * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
+       private SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
punctuatedWatermarkAssigner;
+
+       private transient ListState<Tuple2<KafkaTopicPartition, Long>> 
offsetsStateForCheckpoint;
+
+       // 
------------------------------------------------------------------------
+       //  runtime state (used individually by each parallel subtask) 
+       // 
------------------------------------------------------------------------
+       
+       /** Data for pending but uncommitted offsets */
+       private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+       /** The fetcher implements the connections to the Kafka brokers */
+       private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+       
+       /** The offsets to restore to, if the consumer restores state from a 
checkpoint */
+       private transient volatile HashMap<KafkaTopicPartition, Long> 
restoreToOffset;
+       
+       /** Flag indicating whether the consumer is still running **/
+       private volatile boolean running = true;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Base constructor.
+        *
+        * @param deserializer
+        *           The deserializer to turn raw byte messages into Java/Scala 
objects.
+        */
+       public FlinkKafkaConsumerBase(List<String> topics, 
KeyedDeserializationSchema<T> deserializer) {
+               this.topics = checkNotNull(topics);
+               checkArgument(topics.size() > 0, "You have to define at least 
one topic.");
+               this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
+       }
+
+       /**
+        * This method must be called from the subclasses, to set the list of 
all subscribed partitions
+        * that this consumer will fetch from (across all subtasks).
+        * 
+        * @param allSubscribedPartitions The list of all partitions that all 
subtasks together should fetch from.
+        */
+       protected void setSubscribedPartitions(List<KafkaTopicPartition> 
allSubscribedPartitions) {
+               checkNotNull(allSubscribedPartitions);
+               this.subscribedPartitions = 
Collections.unmodifiableList(allSubscribedPartitions);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Configuration
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner.
+        * The watermark extractor will run per Kafka partition, watermarks 
will be merged across partitions
+        * in the same way as in the Flink runtime, when streams are merged.
+        * 
+        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,
+        * the streams from the partitions are unioned in a "first come first 
serve" fashion. Per-partition
+        * characteristics are usually lost that way. For example, if the 
timestamps are strictly ascending
+        * per Kafka partition, they will not be strictly ascending in the 
resulting Flink DataStream, if the
+        * parallel source subtask reads more that one partition.
+        * 
+        * <p>Running timestamp extractors / watermark generators directly 
inside the Kafka source, per Kafka
+        * partition, allows users to let them exploit the per-partition 
characteristics.
+        * 
+        * <p>Note: One can use either an {@link 
AssignerWithPunctuatedWatermarks} or an
+        * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+        * 
+        * @param assigner The timestamp assigner / watermark generator to use.
+        * @return The consumer object, to allow function chaining.   
+        */
+       public FlinkKafkaConsumerBase<T> 
assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
+               checkNotNull(assigner);
+               
+               if (this.periodicWatermarkAssigner != null) {
+                       throw new IllegalStateException("A periodic watermark 
emitter has already been set.");
+               }
+               try {
+                       ClosureCleaner.clean(assigner, true);
+                       this.punctuatedWatermarkAssigner = new 
SerializedValue<>(assigner);
+                       return this;
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("The given assigner 
is not serializable", e);
+               }
+       }
+
+       /**
+        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner.
+        * The watermark extractor will run per Kafka partition, watermarks 
will be merged across partitions
+        * in the same way as in the Flink runtime, when streams are merged.
+        *
+        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,
+        * the streams from the partitions are unioned in a "first come first 
serve" fashion. Per-partition
+        * characteristics are usually lost that way. For example, if the 
timestamps are strictly ascending
+        * per Kafka partition, they will not be strictly ascending in the 
resulting Flink DataStream, if the
+        * parallel source subtask reads more that one partition.
+        *
+        * <p>Running timestamp extractors / watermark generators directly 
inside the Kafka source, per Kafka
+        * partition, allows users to let them exploit the per-partition 
characteristics.
+        *
+        * <p>Note: One can use either an {@link 
AssignerWithPunctuatedWatermarks} or an
+        * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+        *
+        * @param assigner The timestamp assigner / watermark generator to use.
+        * @return The consumer object, to allow function chaining.   
+        */
+       public FlinkKafkaConsumerBase<T> 
assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
+               checkNotNull(assigner);
+               
+               if (this.punctuatedWatermarkAssigner != null) {
+                       throw new IllegalStateException("A punctuated watermark 
emitter has already been set.");
+               }
+               try {
+                       ClosureCleaner.clean(assigner, true);
+                       this.periodicWatermarkAssigner = new 
SerializedValue<>(assigner);
+                       return this;
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("The given assigner 
is not serializable", e);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void run(SourceContext<T> sourceContext) throws Exception {
+               if (subscribedPartitions == null) {
+                       throw new Exception("The partitions were not set for 
the consumer");
+               }
+
+               // we need only do work, if we actually have partitions assigned
+               if (!subscribedPartitions.isEmpty()) {
+
+                       // (1) create the fetcher that will communicate with 
the Kafka brokers
+                       final AbstractFetcher<T, ?> fetcher = createFetcher(
+                                       sourceContext, subscribedPartitions,
+                                       periodicWatermarkAssigner, 
punctuatedWatermarkAssigner,
+                                       (StreamingRuntimeContext) 
getRuntimeContext());
+
+                       // (2) set the fetcher to the restored checkpoint 
offsets
+                       if (restoreToOffset != null) {
+                               fetcher.restoreOffsets(restoreToOffset);
+                       }
+
+                       // publish the reference, for snapshot-, commit-, and 
cancel calls
+                       // IMPORTANT: We can only do that now, because only now 
will calls to
+                       //            the fetchers 'snapshotCurrentState()' 
method return at least
+                       //            the restored offsets
+                       this.kafkaFetcher = fetcher;
+                       if (!running) {
+                               return;
+                       }
+                       
+                       // (3) run the fetcher' main work method
+                       fetcher.runFetchLoop();
+               }
+               else {
+                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
+                       // to not block watermark forwarding
+                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+
+                       // wait until this is canceled
+                       final Object waitLock = new Object();
+                       while (running) {
+                               try {
+                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                                       synchronized (waitLock) {
+                                               waitLock.wait();
+                                       }
+                               }
+                               catch (InterruptedException e) {
+                                       if (!running) {
+                                               // restore the interrupted 
state, and fall through the loop
+                                               
Thread.currentThread().interrupt();
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               // set ourselves as not running
+               running = false;
+               
+               // abort the fetcher, if there is one
+               if (kafkaFetcher != null) {
+                       kafkaFetcher.cancel();
+               }
+
+               // there will be an interrupt() call to the main thread anyways
+       }
+
+       @Override
+       public void open(Configuration configuration) {
+               List<KafkaTopicPartition> kafkaTopicPartitions = 
getKafkaPartitions(topics);
+
+               if (kafkaTopicPartitions != null) {
+                       assignTopicPartitions(kafkaTopicPartitions);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               // pretty much the same logic as cancelling
+               try {
+                       cancel();
+               } finally {
+                       super.close();
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Checkpoint and restore
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+
+               OperatorStateStore stateStore = context.getOperatorStateStore();
+               offsetsStateForCheckpoint = 
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+
+               if (context.isRestored()) {
+                       restoreToOffset = new HashMap<>();
+                       for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : 
offsetsStateForCheckpoint.get()) {
+                               restoreToOffset.put(kafkaOffset.f0, 
kafkaOffset.f1);
+                       }
+
+                       LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Using the following offsets: {}", 
restoreToOffset);
+                       }
+               } else {
+                       LOG.info("No restore state for FlinkKafkaConsumer.");
+               }
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               if (!running) {
+                       LOG.debug("snapshotState() called on closed source");
+               } else {
+
+                       offsetsStateForCheckpoint.clear();
+
+                       final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+                       if (fetcher == null) {
+                               // the fetcher has not yet been initialized, 
which means we need to return the
+                               // originally restored offsets or the assigned 
partitions
+
+                               if (restoreToOffset != null) {
+
+                                       for (Map.Entry<KafkaTopicPartition, 
Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
+                                               offsetsStateForCheckpoint.add(
+                                                               
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
+                                       }
+                               } else if (subscribedPartitions != null) {
+                                       for (KafkaTopicPartition 
subscribedPartition : subscribedPartitions) {
+                                               offsetsStateForCheckpoint.add(
+                                                               
Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
+                                       }
+                               }
+
+                               // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
+                               // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
+                               
pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
+                       } else {
+                               HashMap<KafkaTopicPartition, Long> 
currentOffsets = fetcher.snapshotCurrentState();
+
+                               // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
+                               // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
+                               
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+
+                               for (Map.Entry<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
+                                       offsetsStateForCheckpoint.add(
+                                                       
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
+                               }
+                       }
+
+                       // truncate the map of pending offsets to commit, to 
prevent infinite growth
+                       while (pendingOffsetsToCommit.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
+                               pendingOffsetsToCommit.remove(0);
+                       }
+               }
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               if (!running) {
+                       LOG.debug("notifyCheckpointComplete() called on closed 
source");
+                       return;
+               }
+
+               final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+               if (fetcher == null) {
+                       LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
+                       return;
+               }
+               
+               // only one commit operation must be in progress
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Committing offsets to Kafka/ZooKeeper for 
checkpoint " + checkpointId);
+               }
+
+               try {
+                       final int posInMap = 
pendingOffsetsToCommit.indexOf(checkpointId);
+                       if (posInMap == -1) {
+                               LOG.warn("Received confirmation for unknown 
checkpoint id {}", checkpointId);
+                               return;
+                       }
+
+                       @SuppressWarnings("unchecked")
+                       HashMap<KafkaTopicPartition, Long> offsets =
+                                       (HashMap<KafkaTopicPartition, Long>) 
pendingOffsetsToCommit.remove(posInMap);
+
+                       // remove older checkpoints in map
+                       for (int i = 0; i < posInMap; i++) {
+                               pendingOffsetsToCommit.remove(0);
+                       }
+
+                       if (offsets == null || offsets.size() == 0) {
+                               LOG.debug("Checkpoint state was empty.");
+                               return;
+                       }
+                       fetcher.commitInternalOffsetsToKafka(offsets);
+               }
+               catch (Exception e) {
+                       if (running) {
+                               throw e;
+                       }
+                       // else ignore exception if we are no longer running
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Kafka Consumer specific methods
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Creates the fetcher that connect to the Kafka brokers, pulls data, 
deserialized the
+        * data, and emits it into the data streams.
+        * 
+        * @param sourceContext The source context to emit data to.
+        * @param thisSubtaskPartitions The set of partitions that this subtask 
should handle.
+        * @param watermarksPeriodic Optional, a serialized timestamp extractor 
/ periodic watermark generator.
+        * @param watermarksPunctuated Optional, a serialized timestamp 
extractor / punctuated watermark generator.
+        * @param runtimeContext The task's runtime context.
+        * 
+        * @return The instantiated fetcher
+        * 
+        * @throws Exception The method should forward exceptions
+        */
+       protected abstract AbstractFetcher<T, ?> createFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> thisSubtaskPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext) throws 
Exception;
+
+       protected abstract List<KafkaTopicPartition> 
getKafkaPartitions(List<String> topics);
+       
+       // 
------------------------------------------------------------------------
+       //  ResultTypeQueryable methods 
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return deserializer.getProducedType();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private void assignTopicPartitions(List<KafkaTopicPartition> 
kafkaTopicPartitions) {
+               subscribedPartitions = new ArrayList<>();
+
+               if (restoreToOffset != null) {
+                       for (KafkaTopicPartition kafkaTopicPartition : 
kafkaTopicPartitions) {
+                               if 
(restoreToOffset.containsKey(kafkaTopicPartition)) {
+                                       
subscribedPartitions.add(kafkaTopicPartition);
+                               }
+                       }
+               } else {
+                       Collections.sort(kafkaTopicPartitions, new 
Comparator<KafkaTopicPartition>() {
+                               @Override
+                               public int compare(KafkaTopicPartition o1, 
KafkaTopicPartition o2) {
+                                       int topicComparison = 
o1.getTopic().compareTo(o2.getTopic());
+
+                                       if (topicComparison == 0) {
+                                               return o1.getPartition() - 
o2.getPartition();
+                                       } else {
+                                               return topicComparison;
+                                       }
+                               }
+                       });
+
+                       for (int i = 
getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i 
+= getRuntimeContext().getNumberOfParallelSubtasks()) {
+                               
subscribedPartitions.add(kafkaTopicPartitions.get(i));
+                       }
+               }
+       }
+
+       /**
+        * Selects which of the given partitions should be handled by a 
specific consumer,
+        * given a certain number of consumers.
+        * 
+        * @param allPartitions The partitions to select from
+        * @param numConsumers The number of consumers
+        * @param consumerIndex The index of the specific consumer
+        * 
+        * @return The sublist of partitions to be handled by that consumer.
+        */
+       protected static List<KafkaTopicPartition> assignPartitions(
+                       List<KafkaTopicPartition> allPartitions,
+                       int numConsumers, int consumerIndex) {
+               final List<KafkaTopicPartition> thisSubtaskPartitions = new 
ArrayList<>(
+                               allPartitions.size() / numConsumers + 1);
+
+               for (int i = 0; i < allPartitions.size(); i++) {
+                       if (i % numConsumers == consumerIndex) {
+                               thisSubtaskPartitions.add(allPartitions.get(i));
+                       }
+               }
+               
+               return thisSubtaskPartitions;
+       }
+       
+       /**
+        * Logs the partition information in INFO level.
+        * 
+        * @param logger The logger to log to.
+        * @param partitionInfos List of subscribed partitions
+        */
+       protected static void logPartitionInfo(Logger logger, 
List<KafkaTopicPartition> partitionInfos) {
+               Map<String, Integer> countPerTopic = new HashMap<>();
+               for (KafkaTopicPartition partition : partitionInfos) {
+                       Integer count = countPerTopic.get(partition.getTopic());
+                       if (count == null) {
+                               count = 1;
+                       } else {
+                               count++;
+                       }
+                       countPerTopic.put(partition.getTopic(), count);
+               }
+               StringBuilder sb = new StringBuilder(
+                               "Consumer is going to read the following topics 
(with number of partitions): ");
+               
+               for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
+                       sb.append(e.getKey()).append(" 
(").append(e.getValue()).append("), ");
+               }
+               
+               logger.info(sb.toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
new file mode 100644
index 0000000..d413f1c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Collections;
+import java.util.Comparator;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer provides at-least-once reliability 
guarantees when
+ * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
+ * Otherwise, the producer doesn't provide any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> 
implements CheckpointedFunction {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Configuration key for disabling the metrics reporting
+        */
+       public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
+
+       /**
+        * Array with the partition ids of the given defaultTopicId
+        * The size of this array is the number of partitions
+        */
+       protected int[] partitions;
+
+       /**
+        * User defined properties for the Producer
+        */
+       protected final Properties producerConfig;
+
+       /**
+        * The name of the default topic this producer is writing data to
+        */
+       protected final String defaultTopicId;
+
+       /**
+        * (Serializable) SerializationSchema for turning objects used with 
Flink into
+        * byte[] for Kafka.
+        */
+       protected final KeyedSerializationSchema<IN> schema;
+
+       /**
+        * User-provided partitioner for assigning an object to a Kafka 
partition.
+        */
+       protected final KafkaPartitioner<IN> partitioner;
+
+       /**
+        * Flag indicating whether to accept failures (and log them), or to 
fail on failures
+        */
+       protected boolean logFailuresOnly;
+
+       /**
+        * If true, the producer will wait until all outstanding records have 
been send to the broker.
+        */
+       protected boolean flushOnCheckpoint;
+       
+       // -------------------------------- Runtime fields 
------------------------------------------
+
+       /** KafkaProducer instance */
+       protected transient KafkaProducer<byte[], byte[]> producer;
+
+       /** The callback than handles error propagation or logging callbacks */
+       protected transient Callback callback;
+
+       /** Errors encountered in the async producer are stored here */
+       protected transient volatile Exception asyncException;
+
+       /** Lock for accessing the pending records */
+       protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
+
+       /** Number of unacknowledged records. */
+       protected long pendingRecords;
+
+       protected OperatorStateStore stateStore;
+
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param defaultTopicId The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner
+        */
+       public FlinkKafkaProducerBase(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               requireNonNull(defaultTopicId, "TopicID not set");
+               requireNonNull(serializationSchema, "serializationSchema not 
set");
+               requireNonNull(producerConfig, "producerConfig not set");
+               ClosureCleaner.clean(customPartitioner, true);
+               ClosureCleaner.ensureSerializable(serializationSchema);
+
+               this.defaultTopicId = defaultTopicId;
+               this.schema = serializationSchema;
+               this.producerConfig = producerConfig;
+
+               // set the producer configuration properties for kafka record 
key value serializers.
+               if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+                       
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+               }
+
+               if 
(!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+                       
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getCanonicalName());
+               } else {
+                       LOG.warn("Overwriting the '{}' is not recommended", 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+               }
+
+               // eagerly ensure that bootstrap servers are set.
+               if 
(!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+                       throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
+               }
+
+               this.partitioner = customPartitioner;
+       }
+
+       // ---------------------------------- Properties 
--------------------------
+
+       /**
+        * Defines whether the producer should fail on errors, or only log them.
+        * If this is set to true, then exceptions will be only logged, if set 
to false,
+        * exceptions will be eventually thrown and cause the streaming program 
to 
+        * fail (and enter recovery).
+        * 
+        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+        */
+       public void setLogFailuresOnly(boolean logFailuresOnly) {
+               this.logFailuresOnly = logFailuresOnly;
+       }
+
+       /**
+        * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka buffers
+        * to be acknowledged by the Kafka producer on a checkpoint.
+        * This way, the producer can guarantee that messages in the Kafka 
buffers are part of the checkpoint.
+        *
+        * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
+        */
+       public void setFlushOnCheckpoint(boolean flush) {
+               this.flushOnCheckpoint = flush;
+       }
+
+       /**
+        * Used for testing only
+        */
+       protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
+               return new KafkaProducer<>(props);
+       }
+
+       // ----------------------------------- Utilities 
--------------------------
+       
+       /**
+        * Initializes the connection to Kafka.
+        */
+       @Override
+       public void open(Configuration configuration) {
+               producer = getKafkaProducer(this.producerConfig);
+
+               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
+               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
+               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
+                       @Override
+                       public int compare(PartitionInfo o1, PartitionInfo o2) {
+                               return Integer.compare(o1.partition(), 
o2.partition());
+                       }
+               });
+
+               partitions = new int[partitionsList.size()];
+               for (int i = 0; i < partitions.length; i++) {
+                       partitions[i] = partitionsList.get(i).partition();
+               }
+
+               RuntimeContext ctx = getRuntimeContext();
+               if (partitioner != null) {
+                       partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
+               }
+
+               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
topic {}", 
+                               ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+
+               // register Kafka metrics to Flink accumulators
+               if 
(!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
+                       Map<MetricName, ? extends Metric> metrics = 
this.producer.metrics();
+
+                       if (metrics == null) {
+                               // MapR's Kafka implementation returns null 
here.
+                               LOG.info("Producer implementation does not 
support metrics");
+                       } else {
+                               final MetricGroup kafkaMetricGroup = 
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+                               for (Map.Entry<MetricName, ? extends Metric> 
metric: metrics.entrySet()) {
+                                       
kafkaMetricGroup.gauge(metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
+                               }
+                       }
+               }
+
+               if (flushOnCheckpoint && 
!((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
+                       LOG.warn("Flushing on checkpoint is enabled, but 
checkpointing is not enabled. Disabling flushing.");
+                       flushOnCheckpoint = false;
+               }
+
+               if (logFailuresOnly) {
+                       callback = new Callback() {
+                               @Override
+                               public void onCompletion(RecordMetadata 
metadata, Exception e) {
+                                       if (e != null) {
+                                               LOG.error("Error while sending 
record to Kafka: " + e.getMessage(), e);
+                                       }
+                                       acknowledgeMessage();
+                               }
+                       };
+               }
+               else {
+                       callback = new Callback() {
+                               @Override
+                               public void onCompletion(RecordMetadata 
metadata, Exception exception) {
+                                       if (exception != null && asyncException 
== null) {
+                                               asyncException = exception;
+                                       }
+                                       acknowledgeMessage();
+                               }
+                       };
+               }
+       }
+
+       /**
+        * Called when new data arrives to the sink, and forwards it to Kafka.
+        *
+        * @param next
+        *              The incoming data
+        */
+       @Override
+       public void invoke(IN next) throws Exception {
+               // propagate asynchronous errors
+               checkErroneous();
+
+               byte[] serializedKey = schema.serializeKey(next);
+               byte[] serializedValue = schema.serializeValue(next);
+               String targetTopic = schema.getTargetTopic(next);
+               if (targetTopic == null) {
+                       targetTopic = defaultTopicId;
+               }
+
+               ProducerRecord<byte[], byte[]> record;
+               if (partitioner == null) {
+                       record = new ProducerRecord<>(targetTopic, 
serializedKey, serializedValue);
+               } else {
+                       record = new ProducerRecord<>(targetTopic, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
+               }
+               if (flushOnCheckpoint) {
+                       synchronized (pendingRecordsLock) {
+                               pendingRecords++;
+                       }
+               }
+               producer.send(record, callback);
+       }
+
+
+       @Override
+       public void close() throws Exception {
+               if (producer != null) {
+                       producer.close();
+               }
+               
+               // make sure we propagate pending errors
+               checkErroneous();
+       }
+
+       // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
+
+       private void acknowledgeMessage() {
+               if (flushOnCheckpoint) {
+                       synchronized (pendingRecordsLock) {
+                               pendingRecords--;
+                               if (pendingRecords == 0) {
+                                       pendingRecordsLock.notifyAll();
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Flush pending records.
+        */
+       protected abstract void flush();
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               this.stateStore = context.getOperatorStateStore();
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
+               if (flushOnCheckpoint) {
+                       // flushing is activated: We need to wait until 
pendingRecords is 0
+                       flush();
+                       synchronized (pendingRecordsLock) {
+                               if (pendingRecords != 0) {
+                                       throw new 
IllegalStateException("Pending record count must be zero at this point: " + 
pendingRecords);
+                               }
+                               // pending records count is 0. We can now 
confirm the checkpoint
+                       }
+               }
+       }
+
+       // ----------------------------------- Utilities 
--------------------------
+
+       protected void checkErroneous() throws Exception {
+               Exception e = asyncException;
+               if (e != null) {
+                       // prevent double throwing
+                       asyncException = null;
+                       throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
+               }
+       }
+       
+       public static Properties getPropertiesFromBrokerList(String brokerList) 
{
+               String[] elements = brokerList.split(",");
+               
+               // validate the broker addresses
+               for (String broker: elements) {
+                       NetUtils.getCorrectHostnamePort(broker);
+               }
+               
+               Properties props = new Properties();
+               props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
+               return props;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
new file mode 100644
index 0000000..ee98783
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Base class for {@link KafkaTableSink} that serializes data in JSON format
+ */
+public abstract class KafkaJsonTableSink extends KafkaTableSink {
+
+       /**
+        * Creates KafkaJsonTableSink
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public KafkaJsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
+
+       @Override
+       protected SerializationSchema<Row> createSerializationSchema(String[] 
fieldNames) {
+               return new JsonRowSerializationSchema(fieldNames);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
new file mode 100644
index 0000000..f145509
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka JSON {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ *
+ * <p>The field names are used to parse the JSON file and so are the types.
+ */
+public abstract class KafkaJsonTableSource extends KafkaTableSource {
+
+       /**
+        * Creates a generic Kafka JSON {@link StreamTableSource}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param fieldNames Row field names.
+        * @param fieldTypes Row field types.
+        */
+       KafkaJsonTableSource(
+                       String topic,
+                       Properties properties,
+                       String[] fieldNames,
+                       Class<?>[] fieldTypes) {
+
+               super(topic, properties, 
createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+       }
+
+       /**
+        * Creates a generic Kafka JSON {@link StreamTableSource}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param fieldNames Row field names.
+        * @param fieldTypes Row field types.
+        */
+       KafkaJsonTableSource(
+                       String topic,
+                       Properties properties,
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes) {
+
+               super(topic, properties, 
createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+       }
+
+       /**
+        * Configures the failure behaviour if a JSON field is missing.
+        *
+        * <p>By default, a missing field is ignored and the field is set to 
null.
+        *
+        * @param failOnMissingField Flag indicating whether to fail or not on 
a missing field.
+        */
+       public void setFailOnMissingField(boolean failOnMissingField) {
+               JsonRowDeserializationSchema deserializationSchema = 
(JsonRowDeserializationSchema) getDeserializationSchema();
+               deserializationSchema.setFailOnMissingField(failOnMissingField);
+       }
+
+       private static JsonRowDeserializationSchema createDeserializationSchema(
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes) {
+
+               return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+       }
+
+       private static JsonRowDeserializationSchema createDeserializationSchema(
+                       String[] fieldNames,
+                       Class<?>[] fieldTypes) {
+
+               return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
new file mode 100644
index 0000000..714d9cd
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sinks.StreamTableSink;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSink}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, KafkaPartitioner)}}.
+ */
+public abstract class KafkaTableSink implements StreamTableSink<Row> {
+
+       protected final String topic;
+       protected final Properties properties;
+       protected SerializationSchema<Row> serializationSchema;
+       protected final KafkaPartitioner<Row> partitioner;
+       protected String[] fieldNames;
+       protected TypeInformation[] fieldTypes;
+
+       /**
+        * Creates KafkaTableSink
+        *
+        * @param topic                 Kafka topic to write to.
+        * @param properties            Properties for the Kafka consumer.
+        * @param partitioner           Partitioner to select Kafka partition 
for each item
+        */
+       public KafkaTableSink(
+                       String topic,
+                       Properties properties,
+                       KafkaPartitioner<Row> partitioner) {
+
+               this.topic = Preconditions.checkNotNull(topic, "topic");
+               this.properties = Preconditions.checkNotNull(properties, 
"properties");
+               this.partitioner = Preconditions.checkNotNull(partitioner, 
"partitioner");
+       }
+
+       /**
+        * Returns the version-specifid Kafka producer.
+        *
+        * @param topic               Kafka topic to produce to.
+        * @param properties          Properties for the Kafka producer.
+        * @param serializationSchema Serialization schema to use to create 
Kafka records.
+        * @param partitioner         Partitioner to select Kafka partition.
+        * @return The version-specific Kafka producer
+        */
+       protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+               String topic, Properties properties,
+               SerializationSchema<Row> serializationSchema,
+               KafkaPartitioner<Row> partitioner);
+
+       /**
+        * Create serialization schema for converting table rows into bytes.
+        *
+        * @param fieldNames Field names in table rows.
+        * @return Instance of serialization schema
+        */
+       protected abstract SerializationSchema<Row> 
createSerializationSchema(String[] fieldNames);
+
+       /**
+        * Create a deep copy of this sink.
+        *
+        * @return Deep copy of this sink
+        */
+       protected abstract KafkaTableSink createCopy();
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               FlinkKafkaProducerBase<Row> kafkaProducer = 
createKafkaProducer(topic, properties, serializationSchema, partitioner);
+               dataStream.addSink(kafkaProducer);
+       }
+
+       @Override
+       public TypeInformation<Row> getOutputType() {
+               return new RowTypeInfo(getFieldTypes());
+       }
+
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return fieldTypes;
+       }
+
+       @Override
+       public KafkaTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               KafkaTableSink copy = createCopy();
+               copy.fieldNames = Preconditions.checkNotNull(fieldNames, 
"fieldNames");
+               copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, 
"fieldTypes");
+               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
+                       "Number of provided field names and types does not 
match.");
+               copy.serializationSchema = 
createSerializationSchema(fieldNames);
+
+               return copy;
+       }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
new file mode 100644
index 0000000..fd423d7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sources.StreamTableSource;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaTableSource implements StreamTableSource<Row> {
+
+       /** The Kafka topic to consume. */
+       private final String topic;
+
+       /** Properties for the Kafka consumer. */
+       private final Properties properties;
+
+       /** Deserialization schema to use for Kafka records. */
+       private final DeserializationSchema<Row> deserializationSchema;
+
+       /** Row field names. */
+       private final String[] fieldNames;
+
+       /** Row field types. */
+       private final TypeInformation<?>[] fieldTypes;
+
+       /**
+        * Creates a generic Kafka {@link StreamTableSource}.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @param fieldNames            Row field names.
+        * @param fieldTypes            Row field types.
+        */
+       KafkaTableSource(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       String[] fieldNames,
+                       Class<?>[] fieldTypes) {
+
+               this(topic, properties, deserializationSchema, fieldNames, 
toTypeInfo(fieldTypes));
+       }
+
+       /**
+        * Creates a generic Kafka {@link StreamTableSource}.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @param fieldNames            Row field names.
+        * @param fieldTypes            Row field types.
+        */
+       KafkaTableSource(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema,
+                       String[] fieldNames,
+                       TypeInformation<?>[] fieldTypes) {
+
+               this.topic = Preconditions.checkNotNull(topic, "Topic");
+               this.properties = Preconditions.checkNotNull(properties, 
"Properties");
+               this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
+               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
+               this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field 
types");
+
+               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
+                               "Number of provided field names and types does 
not match.");
+       }
+
+       /**
+        * NOTE: This method is for internal use only for defining a 
TableSource.
+        *       Do not use it in Table API programs.
+        */
+       @Override
+       public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+               // Version-specific Kafka consumer
+               FlinkKafkaConsumerBase<Row> kafkaConsumer = 
getKafkaConsumer(topic, properties, deserializationSchema);
+               DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
+               return kafkaSource;
+       }
+
+       @Override
+       public int getNumberOfFields() {
+               return fieldNames.length;
+       }
+
+       @Override
+       public String[] getFieldsNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return fieldTypes;
+       }
+
+       @Override
+       public TypeInformation<Row> getReturnType() {
+               return new RowTypeInfo(fieldTypes);
+       }
+
+       /**
+        * Returns the version-specific Kafka consumer.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @return The version-specific Kafka consumer
+        */
+       abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema);
+
+       /**
+        * Returns the deserialization schema.
+        *
+        * @return The deserialization schema
+        */
+       protected DeserializationSchema<Row> getDeserializationSchema() {
+               return deserializationSchema;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
new file mode 100644
index 0000000..cf39606
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all fetchers, which implement the connections to Kafka 
brokers and
+ * pull records from Kafka partitions.
+ * 
+ * <p>This fetcher base class implements the logic around emitting records and 
tracking offsets,
+ * as well as around the optional timestamp assignment and watermark 
generation. 
+ * 
+ * @param <T> The type of elements deserialized from Kafka's byte records, and 
emitted into
+ *            the Flink data streams.
+ * @param <KPH> The type of topic/partition identifier used by Kafka in the 
specific version.
+ */
+public abstract class AbstractFetcher<T, KPH> {
+       
+       protected static final int NO_TIMESTAMPS_WATERMARKS = 0;
+       protected static final int PERIODIC_WATERMARKS = 1;
+       protected static final int PUNCTUATED_WATERMARKS = 2;
+       
+       // 
------------------------------------------------------------------------
+       
+       /** The source context to emit records and watermarks to */
+       protected final SourceContext<T> sourceContext;
+
+       /** The lock that guarantees that record emission and state updates are 
atomic,
+        * from the view of taking a checkpoint */
+       protected final Object checkpointLock;
+
+       /** All partitions (and their state) that this fetcher is subscribed to 
*/
+       private final KafkaTopicPartitionState<KPH>[] allPartitions;
+
+       /** The mode describing whether the fetcher also generates timestamps 
and watermarks */
+       protected final int timestampWatermarkMode;
+
+       /** Flag whether to register metrics for the fetcher */
+       protected final boolean useMetrics;
+
+       /** Only relevant for punctuated watermarks: The current cross 
partition watermark */
+       private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
+
+       // 
------------------------------------------------------------------------
+       
+       protected AbstractFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> assignedPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
+                       boolean useMetrics) throws Exception
+       {
+               this.sourceContext = checkNotNull(sourceContext);
+               this.checkpointLock = sourceContext.getCheckpointLock();
+               this.useMetrics = useMetrics;
+               
+               // figure out what we watermark mode we will be using
+               
+               if (watermarksPeriodic == null) {
+                       if (watermarksPunctuated == null) {
+                               // simple case, no watermarks involved
+                               timestampWatermarkMode = 
NO_TIMESTAMPS_WATERMARKS;
+                       } else {
+                               timestampWatermarkMode = PUNCTUATED_WATERMARKS;
+                       }
+               } else {
+                       if (watermarksPunctuated == null) {
+                               timestampWatermarkMode = PERIODIC_WATERMARKS;
+                       } else {
+                               throw new IllegalArgumentException("Cannot have 
both periodic and punctuated watermarks");
+                       }
+               }
+               
+               // create our partition state according to the 
timestamp/watermark mode 
+               this.allPartitions = initializePartitions(
+                               assignedPartitions,
+                               timestampWatermarkMode,
+                               watermarksPeriodic, watermarksPunctuated,
+                               userCodeClassLoader);
+               
+               // if we have periodic watermarks, kick off the interval 
scheduler
+               if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] 
parts = 
+                                       
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+                       
+                       PeriodicWatermarkEmitter periodicEmitter = 
+                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, processingTimeProvider, autoWatermarkInterval);
+                       periodicEmitter.start();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets all partitions (with partition state) that this fetcher is 
subscribed to.
+        *
+        * @return All subscribed partitions.
+        */
+       protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
+               return allPartitions;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Core fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       public abstract void runFetchLoop() throws Exception;
+       
+       public abstract void cancel();
+
+       // 
------------------------------------------------------------------------
+       //  Kafka version specifics
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Creates the Kafka version specific representation of the given
+        * topic partition.
+        * 
+        * @param partition The Flink representation of the Kafka topic 
partition.
+        * @return The specific Kafka representation of the Kafka topic 
partition.
+        */
+       public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition 
partition);
+
+       /**
+        * Commits the given partition offsets to the Kafka brokers (or to 
ZooKeeper for
+        * older Kafka versions). The given offsets are the internal 
checkpointed offsets, representing
+        * the last processed record of each partition. Version-specific 
implementations of this method
+        * need to hold the contract that the given offsets must be incremented 
by 1 before
+        * committing them, so that committed offsets to Kafka represent "the 
next record to process".
+        * 
+        * @param offsets The offsets to commit to Kafka (implementations must 
increment offsets by 1 before committing).
+        * @throws Exception This method forwards exceptions.
+        */
+       public abstract void 
commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception;
+       
+       // 
------------------------------------------------------------------------
+       //  snapshot and restore the state
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Takes a snapshot of the partition offsets.
+        * 
+        * <p>Important: This method mus be called under the checkpoint lock.
+        * 
+        * @return A map from partition to current offset.
+        */
+       public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+               // this method assumes that the checkpoint lock is held
+               assert Thread.holdsLock(checkpointLock);
+
+               HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>(allPartitions.length);
+               for (KafkaTopicPartitionState<?> partition : 
subscribedPartitions()) {
+                       state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+               }
+               return state;
+       }
+
+       /**
+        * Restores the partition offsets.
+        * 
+        * @param snapshotState The offsets for the partitions 
+        */
+       public void restoreOffsets(HashMap<KafkaTopicPartition, Long> 
snapshotState) {
+               for (KafkaTopicPartitionState<?> partition : allPartitions) {
+                       Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
+                       if (offset != null) {
+                               partition.setOffset(offset);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  emitting records
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Emits a record without attaching an existing timestamp to it.
+        * 
+        * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
+        * That makes the fast path efficient, the extended paths are called as 
separate methods.
+        * 
+        * @param record The record to emit
+        * @param partitionState The state of the Kafka partition from which 
the record was fetched
+        * @param offset The offset of the record
+        */
+       protected void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset) throws Exception {
+               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+                       // fast path logic, in case there are no watermarks
+
+                       // emit the record, using the checkpoint lock to 
guarantee
+                       // atomicity of record emission and offset state update
+                       synchronized (checkpointLock) {
+                               sourceContext.collect(record);
+                               partitionState.setOffset(offset);
+                       }
+               }
+               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, Long.MIN_VALUE);
+               }
+               else {
+                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, Long.MIN_VALUE);
+               }
+       }
+
+       /**
+        * Emits a record attaching a timestamp to it.
+        *
+        * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
+        * That makes the fast path efficient, the extended paths are called as 
separate methods.
+        *
+        * @param record The record to emit
+        * @param partitionState The state of the Kafka partition from which 
the record was fetched
+        * @param offset The offset of the record
+        */
+       protected void emitRecordWithTimestamp(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long timestamp) throws Exception {
+
+               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+                       // fast path logic, in case there are no watermarks 
generated in the fetcher
+
+                       // emit the record, using the checkpoint lock to 
guarantee
+                       // atomicity of record emission and offset state update
+                       synchronized (checkpointLock) {
+                               sourceContext.collectWithTimestamp(record, 
timestamp);
+                               partitionState.setOffset(offset);
+                       }
+               }
+               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset, timestamp);
+               }
+               else {
+                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset, timestamp);
+               }
+       }
+
+       /**
+        * Record emission, if a timestamp will be attached from an assigner 
that is
+        * also a periodic watermark generator.
+        */
+       protected void emitRecordWithTimestampAndPeriodicWatermark(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long kafkaEventTimestamp)
+       {
+               @SuppressWarnings("unchecked")
+               final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
withWatermarksState =
+                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
+
+               // extract timestamp - this accesses/modifies the per-partition 
state inside the
+               // watermark generator instance, so we need to lock the access 
on the
+               // partition state. concurrent access can happen from the 
periodic emitter
+               final long timestamp;
+               //noinspection SynchronizationOnLocalVariableOrMethodParameter
+               synchronized (withWatermarksState) {
+                       timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
+               }
+
+               // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
+               // atomicity of record emission and offset state update 
+               synchronized (checkpointLock) {
+                       sourceContext.collectWithTimestamp(record, timestamp);
+                       partitionState.setOffset(offset);
+               }
+       }
+
+       /**
+        * Record emission, if a timestamp will be attached from an assigner 
that is
+        * also a punctuated watermark generator.
+        */
+       protected void emitRecordWithTimestampAndPunctuatedWatermark(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset, long kafkaEventTimestamp)
+       {
+               @SuppressWarnings("unchecked")
+               final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
+                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
+
+               // only one thread ever works on accessing timestamps and 
watermarks
+               // from the punctuated extractor
+               final long timestamp = 
withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
+               final Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
+
+               // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
+               // atomicity of record emission and offset state update 
+               synchronized (checkpointLock) {
+                       sourceContext.collectWithTimestamp(record, timestamp);
+                       partitionState.setOffset(offset);
+               }
+
+               // if we also have a new per-partition watermark, check if that 
is also a
+               // new cross-partition watermark
+               if (newWatermark != null) {
+                       updateMinPunctuatedWatermark(newWatermark);
+               }
+       }
+
+       /**
+        *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
+        */
+       private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
+               if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
+                       long newMin = Long.MAX_VALUE;
+
+                       for (KafkaTopicPartitionState<?> state : allPartitions) 
{
+                               @SuppressWarnings("unchecked")
+                               final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+                               
+                               newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
+                       }
+
+                       // double-check locking pattern
+                       if (newMin > maxWatermarkSoFar) {
+                               synchronized (checkpointLock) {
+                                       if (newMin > maxWatermarkSoFar) {
+                                               maxWatermarkSoFar = newMin;
+                                               sourceContext.emitWatermark(new 
Watermark(newMin));
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Utility method that takes the topic partitions and creates the topic 
partition state
+        * holders. If a watermark generator per partition exists, this will 
also initialize those.
+        */
+       private KafkaTopicPartitionState<KPH>[] initializePartitions(
+                       List<KafkaTopicPartition> assignedPartitions,
+                       int timestampWatermarkMode,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       ClassLoader userCodeClassLoader)
+               throws IOException, ClassNotFoundException
+       {
+               switch (timestampWatermarkMode) {
+                       
+                       case NO_TIMESTAMPS_WATERMARKS: {
+                               @SuppressWarnings("unchecked")
+                               KafkaTopicPartitionState<KPH>[] partitions =
+                                               
(KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+                               int pos = 0;
+                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                                       // create the kafka version specific 
partition handle
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+                                       partitions[pos++] = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
+                               }
+
+                               return partitions;
+                       }
+
+                       case PERIODIC_WATERMARKS: {
+                               @SuppressWarnings("unchecked")
+                               
KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+                                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+                                                               new 
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+                               int pos = 0;
+                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+
+                                       AssignerWithPeriodicWatermarks<T> 
assignerInstance =
+                                                       
watermarksPeriodic.deserializeValue(userCodeClassLoader);
+                                       
+                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+                                                       partition, kafkaHandle, 
assignerInstance);
+                               }
+
+                               return partitions;
+                       }
+
+                       case PUNCTUATED_WATERMARKS: {
+                               @SuppressWarnings("unchecked")
+                               
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+                                                               new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<?, 
?>[assignedPartitions.size()];
+
+                               int pos = 0;
+                               for (KafkaTopicPartition partition : 
assignedPartitions) {
+                                       KPH kafkaHandle = 
createKafkaPartitionHandle(partition);
+
+                                       AssignerWithPunctuatedWatermarks<T> 
assignerInstance =
+                                                       
watermarksPunctuated.deserializeValue(userCodeClassLoader);
+
+                                       partitions[pos++] = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+                                                       partition, kafkaHandle, 
assignerInstance);
+                               }
+
+                               return partitions;
+                       }
+                       default:
+                               // cannot happen, add this as a guard for the 
future
+                               throw new RuntimeException();
+               }
+       }
+
+       // ------------------------- Metrics ----------------------------------
+
+       /**
+        * Add current and committed offsets to metric group
+        *
+        * @param metricGroup The metric group to use
+        */
+       protected void addOffsetStateGauge(MetricGroup metricGroup) {
+               // add current offsets to gage
+               MetricGroup currentOffsets = 
metricGroup.addGroup("current-offsets");
+               MetricGroup committedOffsets = 
metricGroup.addGroup("committed-offsets");
+               for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) {
+                       currentOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
+                       committedOffsets.gauge(ktp.getTopic() + "-" + 
ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
+               }
+       }
+
+       /**
+        * Gauge types
+        */
+       private enum OffsetGaugeType {
+               CURRENT_OFFSET,
+               COMMITTED_OFFSET
+       }
+
+       /**
+        * Gauge for getting the offset of a KafkaTopicPartitionState.
+        */
+       private static class OffsetGauge implements Gauge<Long> {
+
+               private final KafkaTopicPartitionState<?> ktp;
+               private final OffsetGaugeType gaugeType;
+
+               public OffsetGauge(KafkaTopicPartitionState<?> ktp, 
OffsetGaugeType gaugeType) {
+                       this.ktp = ktp;
+                       this.gaugeType = gaugeType;
+               }
+
+               @Override
+               public Long getValue() {
+                       switch(gaugeType) {
+                               case COMMITTED_OFFSET:
+                                       return ktp.getCommittedOffset();
+                               case CURRENT_OFFSET:
+                                       return ktp.getOffset();
+                               default:
+                                       throw new RuntimeException("Unknown 
gauge type: " + gaugeType);
+                       }
+               }
+       }
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * The periodic watermark emitter. In its given interval, it checks all 
partitions for
+        * the current event time watermark, and possibly emits the next 
watermark.
+        */
+       private static class PeriodicWatermarkEmitter implements 
ProcessingTimeCallback {
+
+               private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?>[] allPartitions;
+               
+               private final SourceContext<?> emitter;
+               
+               private final ProcessingTimeService timerService;
+
+               private final long interval;
+               
+               private long lastWatermarkTimestamp;
+               
+               //-------------------------------------------------
+
+               PeriodicWatermarkEmitter(
+                               
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+                               SourceContext<?> emitter,
+                               ProcessingTimeService timerService,
+                               long autoWatermarkInterval)
+               {
+                       this.allPartitions = checkNotNull(allPartitions);
+                       this.emitter = checkNotNull(emitter);
+                       this.timerService = checkNotNull(timerService);
+                       this.interval = autoWatermarkInterval;
+                       this.lastWatermarkTimestamp = Long.MIN_VALUE;
+               }
+
+               //-------------------------------------------------
+               
+               public void start() {
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
+               }
+               
+               @Override
+               public void onProcessingTime(long timestamp) throws Exception {
+
+                       long minAcrossAll = Long.MAX_VALUE;
+                       for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?> state : allPartitions) {
+                               
+                               // we access the current watermark for the 
periodic assigners under the state
+                               // lock, to prevent concurrent modification to 
any internal variables
+                               final long curr;
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (state) {
+                                       curr = 
state.getCurrentWatermarkTimestamp();
+                               }
+                               
+                               minAcrossAll = Math.min(minAcrossAll, curr);
+                       }
+                       
+                       // emit next watermark, if there is one
+                       if (minAcrossAll > lastWatermarkTimestamp) {
+                               lastWatermarkTimestamp = minAcrossAll;
+                               emitter.emitWatermark(new 
Watermark(minAcrossAll));
+                       }
+                       
+                       // schedule the next watermark
+                       
timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, 
this);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
new file mode 100644
index 0000000..c736493
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A proxy that communicates exceptions between threads. Typically used if an 
exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) 
thread.
+ * 
+ * <p>The spawned thread would set the exception via {@link 
#reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link 
#checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted 
as soon as
+ * an exception occurs.
+ * 
+ * <pre>
+ * {@code
+ * 
+ * final ExceptionProxy errorProxy = new 
ExceptionProxy(Thread.currentThread());
+ * 
+ * Thread subThread = new Thread() {
+ * 
+ *     public void run() {
+ *         try {
+ *             doSomething();
+ *         } catch (Throwable t) {
+ *             errorProxy.reportError(
+ *         } finally {
+ *             doSomeCleanup();
+ *         }
+ *     }
+ * };
+ * subThread.start();
+ * 
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ * 
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ * 
+ * try {
+ *     subThread.join();
+ * } catch (InterruptedException e) {
+ *     errorProxy.checkAndThrowException();
+ *     // restore interrupted status, if not caused by an exception
+ *     Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
+ */
+public class ExceptionProxy {
+       
+       /** The thread that should be interrupted when an exception occurs */
+       private final Thread toInterrupt;
+       
+       /** The exception to throw */ 
+       private final AtomicReference<Throwable> exception;
+
+       /**
+        * Creates an exception proxy that interrupts the given thread upon
+        * report of an exception. The thread to interrupt may be null.
+        * 
+        * @param toInterrupt The thread to interrupt upon an exception. May be 
null.
+        */
+       public ExceptionProxy(@Nullable Thread toInterrupt) {
+               this.toInterrupt = toInterrupt;
+               this.exception = new AtomicReference<>();
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Sets the exception and interrupts the target thread,
+        * if no other exception has occurred so far.
+        * 
+        * <p>The exception is only set (and the interruption is only 
triggered),
+        * if no other exception was set before.
+        * 
+        * @param t The exception that occurred
+        */
+       public void reportError(Throwable t) {
+               // set the exception, if it is the first (and the exception is 
non null)
+               if (t != null && exception.compareAndSet(null, t) && 
toInterrupt != null) {
+                       toInterrupt.interrupt();
+               }
+       }
+
+       /**
+        * Checks whether an exception has been set via {@link 
#reportError(Throwable)}.
+        * If yes, that exception if re-thrown by this method.
+        * 
+        * @throws Exception This method re-throws the exception, if set.
+        */
+       public void checkAndThrowException() throws Exception {
+               Throwable t = exception.get();
+               if (t != null) {
+                       if (t instanceof Exception) {
+                               throw (Exception) t;
+                       }
+                       else if (t instanceof Error) {
+                               throw (Error) t;
+                       }
+                       else {
+                               throw new Exception(t);
+                       }
+               }
+       }
+}

Reply via email to