Repository: flink
Updated Branches:
  refs/heads/master 481091043 -> abb449678


[FLINK-3874] [tableApi] Add KafkaJsonTableSink

This closes #2244


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abb44967
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abb44967
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abb44967

Branch: refs/heads/master
Commit: abb4496781883937a935113c1e33ae1174aafa73
Parents: 4810910
Author: Ivan Mushketyk <[email protected]>
Authored: Tue Jul 5 22:00:18 2016 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Thu Aug 25 22:14:32 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/sinks/CsvTableSink.scala    |   4 +-
 .../flink/api/table/sinks/TableSink.scala       |  40 +-----
 .../flink/api/table/sinks/TableSinkBase.scala   |  67 +++++++++
 .../connectors/kafka/Kafka08JsonTableSink.java  |  51 +++++++
 .../kafka/Kafka08JsonTableSinkITCase.java       |  40 ++++++
 .../connectors/kafka/Kafka09JsonTableSink.java  |  50 +++++++
 .../kafka/Kafka09JsonTableSinkITCase.java       |  39 +++++
 .../connectors/kafka/KafkaJsonTableSink.java    |  47 ++++++
 .../connectors/kafka/KafkaTableSink.java        | 126 ++++++++++++++++
 .../connectors/kafka/KafkaTableSource.java      |  15 +-
 .../connectors/kafka/internals/TypeUtil.java    |  38 +++++
 .../JsonRowDeserializationSchema.java           |   4 +-
 .../JsonRowSerializationSchema.java             |  70 +++++++++
 .../kafka/JsonRowSerializationSchemaTest.java   |  98 +++++++++++++
 .../kafka/KafkaTableSinkTestBase.java           | 142 +++++++++++++++++++
 15 files changed, 780 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
index ed05caf..7567ba8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream
 class CsvTableSink(
     path: String,
     fieldDelim: String = ",")
-  extends BatchTableSink[Row] with StreamTableSink[Row] {
+  extends TableSinkBase[Row] with BatchTableSink[Row] with 
StreamTableSink[Row] {
 
   override def emitDataSet(dataSet: DataSet[Row]): Unit = {
     dataSet
@@ -48,7 +48,7 @@ class CsvTableSink(
       .writeAsText(path)
   }
 
-  override protected def copy: TableSink[Row] = {
+  override protected def copy: TableSinkBase[Row] = {
     new CsvTableSink(path, fieldDelim)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
index 12e57de..3dfc6f1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
@@ -29,9 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
   */
 trait TableSink[T] {
 
-  private var fieldNames: Option[Array[String]] = None
-  private var fieldTypes: Option[Array[TypeInformation[_]]] = None
-
   /**
     * Return the type expected by this [[TableSink]].
     *
@@ -41,27 +38,11 @@ trait TableSink[T] {
     */
   def getOutputType: TypeInformation[T]
 
-  /** Return a deep copy of the [[TableSink]]. */
-  protected def copy: TableSink[T]
-
-  /**
-    * Return the field names of the [[org.apache.flink.api.table.Table]] to 
emit. */
-  protected final def getFieldNames: Array[String] = {
-    fieldNames match {
-      case Some(n) => n
-      case None => throw new IllegalStateException(
-        "TableSink must be configured to retrieve field names.")
-    }
-  }
+  /** Returns the names of the table fields. */
+  def getFieldNames: Array[String]
 
-  /** Return the field types of the [[org.apache.flink.api.table.Table]] to 
emit. */
-  protected final def getFieldTypes: Array[TypeInformation[_]] = {
-    fieldTypes match {
-      case Some(t) => t
-      case None => throw new IllegalStateException(
-        "TableSink must be configured to retrieve field types.")
-    }
-  }
+  /** Returns the types of the table fields. */
+  def getFieldTypes: Array[TypeInformation[_]]
 
   /**
     * Return a copy of this [[TableSink]] configured with the field names and 
types of the
@@ -72,15 +53,6 @@ trait TableSink[T] {
     * @return A copy of this [[TableSink]] configured with the field names and 
types of the
     *         [[org.apache.flink.api.table.Table]] to emit.
     */
-  private[flink] final def configure(
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
-
-    val configuredSink = this.copy
-    configuredSink.fieldNames = Some(fieldNames)
-    configuredSink.fieldTypes = Some(fieldTypes)
-
-    configuredSink
-  }
-
+  def configure(fieldNames: Array[String],
+                fieldTypes: Array[TypeInformation[_]]): TableSink[T]
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
new file mode 100644
index 0000000..612ee0a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+trait TableSinkBase[T] extends TableSink[T] {
+
+  private var fieldNames: Option[Array[String]] = None
+  private var fieldTypes: Option[Array[TypeInformation[_]]] = None
+
+  /** Return a deep copy of the [[TableSink]]. */
+  protected def copy: TableSinkBase[T]
+
+  /**
+    * Return the field names of the [[org.apache.flink.api.table.Table]] to 
emit. */
+  def getFieldNames: Array[String] = {
+    fieldNames match {
+      case Some(n) => n
+      case None => throw new IllegalStateException(
+        "TableSink must be configured to retrieve field names.")
+    }
+  }
+
+  /** Return the field types of the [[org.apache.flink.api.table.Table]] to 
emit. */
+  def getFieldTypes: Array[TypeInformation[_]] = {
+    fieldTypes match {
+      case Some(t) => t
+      case None => throw new IllegalStateException(
+        "TableSink must be configured to retrieve field types.")
+    }
+  }
+
+  /**
+    * Return a copy of this [[TableSink]] configured with the field names and 
types of the
+    * [[org.apache.flink.api.table.Table]] to emit.
+    *
+    * @param fieldNames The field names of the table to emit.
+    * @param fieldTypes The field types of the table to emit.
+    * @return A copy of this [[TableSink]] configured with the field names and 
types of the
+    *         [[org.apache.flink.api.table.Table]] to emit.
+    */
+  final def configure(fieldNames: Array[String],
+                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+
+    val configuredSink = this.copy
+    configuredSink.fieldNames = Some(fieldNames)
+    configuredSink.fieldTypes = Some(fieldTypes)
+
+    configuredSink
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
new file mode 100644
index 0000000..5f869ec
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka08JsonTableSink extends KafkaJsonTableSink {
+       /**
+        * Creates {@link KafkaTableSink} for Kafka 0.8
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public Kafka08JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
+
+       @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer08<>(topic, serializationSchema, 
properties, partitioner);
+       }
+
+       @Override
+       protected Kafka08JsonTableSink createCopy() {
+               return new Kafka08JsonTableSink(topic, properties, partitioner);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
new file mode 100644
index 0000000..f870adf
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink() {
+               Kafka08JsonTableSink sink = new Kafka08JsonTableSink(
+                       TOPIC,
+                       createSinkProperties(),
+                       createPartitioner());
+               return sink.configure(FIELD_NAMES, FIELD_TYPES);
+       }
+
+       protected DeserializationSchema<Row> createRowDeserializationSchema() {
+               return new JsonRowDeserializationSchema(
+                       FIELD_NAMES, FIELD_TYPES);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
new file mode 100644
index 0000000..38ea47c
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+       /**
+        * Creates {@link KafkaTableSink} for Kafka 0.9
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public Kafka09JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
+
+       @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer09<>(topic, serializationSchema, 
properties, partitioner);
+       }
+
+       @Override
+       protected Kafka09JsonTableSink createCopy() {
+               return new Kafka09JsonTableSink(topic, properties, partitioner);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
new file mode 100644
index 0000000..74415f8
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink() {
+               Kafka09JsonTableSink sink = new Kafka09JsonTableSink(
+                       TOPIC,
+                       createSinkProperties(),
+                       createPartitioner());
+               return sink.configure(FIELD_NAMES, FIELD_TYPES);
+       }
+
+       protected DeserializationSchema<Row> createRowDeserializationSchema() {
+               return new JsonRowDeserializationSchema(
+                       FIELD_NAMES, FIELD_TYPES);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
new file mode 100644
index 0000000..ee98783
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Base class for {@link KafkaTableSink} that serializes data in JSON format
+ */
+public abstract class KafkaJsonTableSink extends KafkaTableSink {
+
+       /**
+        * Creates KafkaJsonTableSink
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public KafkaJsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
+
+       @Override
+       protected SerializationSchema<Row> createSerializationSchema(String[] 
fieldNames) {
+               return new JsonRowSerializationSchema(fieldNames);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
new file mode 100644
index 0000000..8f5e811
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.sinks.StreamTableSink;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link StreamTableSink}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, KafkaPartitioner)}}.
+ */
+public abstract class KafkaTableSink implements StreamTableSink<Row> {
+
+       protected final String topic;
+       protected final Properties properties;
+       protected SerializationSchema<Row> serializationSchema;
+       protected final KafkaPartitioner<Row> partitioner;
+       protected String[] fieldNames;
+       protected TypeInformation[] fieldTypes;
+       /**
+        * Creates KafkaTableSink
+        *
+        * @param topic                 Kafka topic to write to.
+        * @param properties            Properties for the Kafka consumer.
+        * @param partitioner           Partitioner to select Kafka partition 
for each item
+        */
+       public KafkaTableSink(
+                       String topic,
+                       Properties properties,
+                       KafkaPartitioner<Row> partitioner) {
+
+               this.topic = Preconditions.checkNotNull(topic, "topic");
+               this.properties = Preconditions.checkNotNull(properties, 
"properties");
+               this.partitioner = Preconditions.checkNotNull(partitioner, 
"partitioner");
+       }
+
+       /**
+        * Returns the version-specifid Kafka producer.
+        *
+        * @param topic               Kafka topic to produce to.
+        * @param properties          Properties for the Kafka producer.
+        * @param serializationSchema Serialization schema to use to create 
Kafka records.
+        * @param partitioner         Partitioner to select Kafka partition.
+        * @return The version-specific Kafka producer
+        */
+       protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+               String topic, Properties properties,
+               SerializationSchema<Row> serializationSchema,
+               KafkaPartitioner<Row> partitioner);
+
+       /**
+        * Create serialization schema for converting table rows into bytes.
+        *
+        * @param fieldNames Field names in table rows.
+        * @return Instance of serialization schema
+        */
+       protected abstract SerializationSchema<Row> 
createSerializationSchema(String[] fieldNames);
+
+       /**
+        * Create a deep copy of this sink.
+        *
+        * @return Deep copy of this sink
+        */
+       protected abstract KafkaTableSink createCopy();
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               FlinkKafkaProducerBase<Row> kafkaProducer = 
createKafkaProducer(topic, properties, serializationSchema, partitioner);
+               dataStream.addSink(kafkaProducer);
+       }
+
+       @Override
+       public TypeInformation<Row> getOutputType() {
+               return new RowTypeInfo(getFieldTypes());
+       }
+
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return fieldTypes;
+       }
+
+       @Override
+       public KafkaTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               KafkaTableSink copy = createCopy();
+               copy.fieldNames = Preconditions.checkNotNull(fieldNames, 
"fieldNames");
+               copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, 
"fieldTypes");
+               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
+                       "Number of provided field names and types does not 
match.");
+               copy.serializationSchema = 
createSerializationSchema(fieldNames);
+
+               return copy;
+       }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index c6904fe..fc6bf44 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.sources.StreamTableSource;
 import org.apache.flink.api.table.typeutils.RowTypeInfo;
@@ -30,6 +29,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Properties;
 
+import static 
org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
+
 /**
  * A version-agnostic Kafka {@link StreamTableSource}.
  *
@@ -147,16 +148,4 @@ abstract class KafkaTableSource implements 
StreamTableSource<Row> {
        protected DeserializationSchema<Row> getDeserializationSchema() {
                return deserializationSchema;
        }
-
-       /**
-        * Creates TypeInformation array for an array of Classes.
-        */
-       private static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
-               TypeInformation<?>[] typeInfos = new 
TypeInformation[fieldTypes.length];
-               for (int i = 0; i < fieldTypes.length; i++) {
-                       typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
-               }
-               return typeInfos;
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
new file mode 100644
index 0000000..7a41ade
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class TypeUtil {
+       private TypeUtil() {}
+
+       /**
+        * Creates TypeInformation array for an array of Classes.
+        * @param fieldTypes classes to extract type information from
+        * @return type information
+        */
+       public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
+               TypeInformation<?>[] typeInfos = new 
TypeInformation[fieldTypes.length];
+               for (int i = 0; i < fieldTypes.length; i++) {
+                       typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
+               }
+               return typeInfos;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index 970c73e..4344810 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -51,7 +51,7 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
        private boolean failOnMissingField;
 
        /**
-        * Creates a JSON deserializtion schema for the given fields and type 
classes.
+        * Creates a JSON deserialization schema for the given fields and type 
classes.
         *
         * @param fieldNames Names of JSON fields to parse.
         * @param fieldTypes Type classes to parse JSON fields as.
@@ -69,7 +69,7 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
        }
 
        /**
-        * Creates a JSON deserializtion schema for the given fields and types.
+        * Creates a JSON deserialization schema for the given fields and types.
         *
         * @param fieldNames Names of JSON fields to parse.
         * @param fieldTypes Types to parse JSON fields as.

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
new file mode 100644
index 0000000..077ff13
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.util.Preconditions;
+
+
+/**
+ * Serialization schema that serializes an object into a JSON bytes.
+ *
+ * <p>Serializes the input {@link Row} object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link JsonRowDeserializationSchema}.
+ */
+public class JsonRowSerializationSchema implements SerializationSchema<Row> {
+       /** Fields names in the input Row object */
+       private final String[] fieldNames;
+       /** Object mapper that is used to create output JSON objects */
+       private static ObjectMapper mapper = new ObjectMapper();
+
+       /**
+        * Creates a JSON serialization schema for the given fields and types.
+        *
+        * @param fieldNames Names of JSON fields to parse.
+        */
+       public JsonRowSerializationSchema(String[] fieldNames) {
+               this.fieldNames = Preconditions.checkNotNull(fieldNames);
+       }
+
+       @Override
+       public byte[] serialize(Row row) {
+               if (row.productArity() != fieldNames.length) {
+                       throw new IllegalStateException(String.format(
+                               "Number of elements in the row %s is different 
from number of field names: %d", row, fieldNames.length));
+               }
+
+               ObjectNode objectNode = mapper.createObjectNode();
+
+               for (int i = 0; i < row.productArity(); i++) {
+                       JsonNode node = 
mapper.valueToTree(row.productElement(i));
+                       objectNode.set(fieldNames[i], node);
+               }
+
+               try {
+                       return mapper.writeValueAsBytes(objectNode);
+               } catch (Exception e) {
+                       throw new RuntimeException("Failed to serialize row", 
e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
new file mode 100644
index 0000000..92af15d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class JsonRowSerializationSchemaTest {
+       @Test
+       public void testRowSerialization() throws IOException {
+               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
+               Row row = new Row(3);
+               row.setField(0, 1);
+               row.setField(1, true);
+               row.setField(2, "str");
+
+               Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, 
row);
+               assertEqualRows(row, resultRow);
+       }
+
+       @Test
+       public void testSerializationOfTwoRows() throws IOException {
+               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, 
String.class};
+               Row row1 = new Row(3);
+               row1.setField(0, 1);
+               row1.setField(1, true);
+               row1.setField(2, "str");
+
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
+
+               byte[] bytes = serializationSchema.serialize(row1);
+               assertEqualRows(row1, deserializationSchema.deserialize(bytes));
+
+               Row row2 = new Row(3);
+               row2.setField(0, 10);
+               row2.setField(1, false);
+               row2.setField(2, "newStr");
+
+               bytes = serializationSchema.serialize(row2);
+               assertEqualRows(row2, deserializationSchema.deserialize(bytes));
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void testInputValidation() {
+               new JsonRowSerializationSchema(null);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testSerializeRowWithInvalidNumberOfFields() {
+               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               Row row = new Row(1);
+               row.setField(0, 1);
+
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               serializationSchema.serialize(row);
+       }
+
+       private Row serializeAndDeserialize(String[] fieldNames, Class[] 
fieldTypes, Row row) throws IOException {
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(fieldNames, fieldTypes);
+
+               byte[] bytes = serializationSchema.serialize(row);
+               return deserializationSchema.deserialize(bytes);
+       }
+
+       private void assertEqualRows(Row expectedRow, Row resultRow) {
+               assertEquals("Deserialized row should have expected number of 
fields",
+                       expectedRow.productArity(), resultRow.productArity());
+               for (int i = 0; i < expectedRow.productArity(); i++) {
+                       assertEquals(String.format("Field number %d should be 
as in the original row", i),
+                               expectedRow.productElement(i), 
resultRow.productElement(i));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abb44967/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
new file mode 100644
index 0000000..5e55b0a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.test.util.SuccessException;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Properties;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements 
Serializable {
+
+       protected final static String TOPIC = "customPartitioningTestTopic";
+       protected final static int PARALLELISM = 1;
+       protected final static String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
+       protected final static TypeInformation[] FIELD_TYPES = 
TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+
+       @Test
+       public void testKafkaTableSink() throws Exception {
+               LOG.info("Starting 
KafkaTableSinkTestBase.testKafkaTableSink()");
+
+               createTestTopic(TOPIC, PARALLELISM, 1);
+               StreamExecutionEnvironment env = createEnvironment();
+
+               createProducingTopology(env);
+               createConsumingTopology(env);
+
+               tryExecute(env, "custom partitioning test");
+               deleteTestTopic(TOPIC);
+               LOG.info("Finished 
KafkaTableSinkTestBase.testKafkaTableSink()");
+       }
+
+       private StreamExecutionEnvironment createEnvironment() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().disableSysoutLogging();
+               return env;
+       }
+
+       private void createProducingTopology(StreamExecutionEnvironment env) {
+               DataStream<Row> stream = env.addSource(new 
SourceFunction<Row>() {
+                       private boolean running = true;
+
+                       @Override
+                       public void run(SourceContext<Row> ctx) throws 
Exception {
+                               long cnt = 0;
+                               while (running) {
+                                       Row row = new Row(2);
+                                       row.setField(0, cnt);
+                                       row.setField(1, "kafka-" + cnt);
+                                       ctx.collect(row);
+                                       cnt++;
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               running = false;
+                       }
+               })
+               .setParallelism(1);
+
+               KafkaTableSink kafkaTableSinkBase = createTableSink();
+
+               kafkaTableSinkBase.emitDataStream(stream);
+       }
+
+       private void createConsumingTopology(StreamExecutionEnvironment env) {
+               DeserializationSchema<Row> deserializationSchema = 
createRowDeserializationSchema();
+
+               FlinkKafkaConsumerBase<Row> source = 
kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
+
+               env.addSource(source).setParallelism(PARALLELISM)
+                       .map(new RichMapFunction<Row, Integer>() {
+                               @Override
+                               public Integer map(Row value) {
+                                       return (Integer) 
value.productElement(0);
+                               }
+                       }).setParallelism(PARALLELISM)
+
+                       .addSink(new SinkFunction<Integer>() {
+                               HashSet<Integer> ids = new HashSet<>();
+                               @Override
+                               public void invoke(Integer value) throws 
Exception {
+                                       ids.add(value);
+
+                                       if (ids.size() == 100) {
+                                               throw new SuccessException();
+                                       }
+                               }
+                       }).setParallelism(1);
+       }
+
+       protected KafkaPartitioner<Row> createPartitioner() {
+               return new CustomPartitioner();
+       }
+
+       protected Properties createSinkProperties() {
+               return 
FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings);
+       }
+
+       protected abstract KafkaTableSink createTableSink();
+
+       protected abstract DeserializationSchema<Row> 
createRowDeserializationSchema();
+
+
+       public static class CustomPartitioner extends KafkaPartitioner<Row> 
implements Serializable {
+               @Override
+               public int partition(Row next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
+                       return 0;
+               }
+       }
+}

Reply via email to