[FLINK-8014] [table] Add Kafka010JsonTableSink.

- Refactor KafkaTableSink tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50fba9aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50fba9aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50fba9aa

Branch: refs/heads/master
Commit: 50fba9aa4e96632f7b32cf98d704683364196cbd
Parents: fc3eebd
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Nov 7 17:59:43 2017 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Nov 16 11:32:12 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010JsonTableSink.java | 73 ++++++++++++++++++++
 .../kafka/Kafka010JsonTableSinkTest.java        | 53 ++++++++++++++
 .../connectors/kafka/Kafka08JsonTableSink.java  | 26 ++++++-
 .../kafka/Kafka08JsonTableSinkTest.java         | 27 +++-----
 .../connectors/kafka/Kafka09JsonTableSink.java  | 26 ++++++-
 .../kafka/Kafka09JsonTableSinkTest.java         | 27 +++-----
 .../connectors/kafka/KafkaJsonTableSink.java    |  5 +-
 .../connectors/kafka/KafkaTableSink.java        | 10 ++-
 .../JsonRowSerializationSchema.java             | 22 +++++-
 .../kafka/JsonRowSerializationSchemaTest.java   | 46 ++++++++----
 .../kafka/KafkaTableSinkTestBase.java           | 30 ++++----
 11 files changed, 269 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
new file mode 100644
index 0000000..431ace0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka010JsonTableSink extends KafkaJsonTableSink {
+
+       /**
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+        * topic with fixed partition assignment.
+        *
+        * <p>Each parallel TableSink instance will write its rows to a single 
Kafka partition.</p>
+        * <ul>
+        * <li>If the number of Kafka partitions is less than the number of 
sink instances, different
+        * sink instances will write to the same partition.</li>
+        * <li>If the number of Kafka partitions is higher than the number of 
sink instance, some
+        * Kafka partitions won't receive data.</li>
+        * </ul>
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        */
+       public Kafka010JsonTableSink(String topic, Properties properties) {
+               super(topic, properties, new FlinkFixedPartitioner<>());
+       }
+
+       /**
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.10
+        * topic with custom partition assignment.
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public Kafka010JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
+
+       @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer010<>(topic, serializationSchema, 
properties, partitioner);
+       }
+
+       @Override
+       protected Kafka010JsonTableSink createCopy() {
+               return new Kafka010JsonTableSink(topic, properties, 
partitioner);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
new file mode 100644
index 0000000..4d805d5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010JsonTableSink}.
+ */
+public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink(
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner) {
+
+               return new Kafka010JsonTableSink(topic, properties, 
partitioner);
+       }
+
+       @Override
+       protected Class<? extends SerializationSchema<Row>> 
getSerializationSchemaClass() {
+               return JsonRowSerializationSchema.class;
+       }
+
+       @Override
+       protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+               return FlinkKafkaProducer010.class;
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index a887048..39d5cb2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@ import java.util.Properties;
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 
        /**
-        * Creates {@link KafkaTableSink} for Kafka 0.8.
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.8
+        * topic with fixed partition assignment.
+        *
+        * <p>Each parallel TableSink instance will write its rows to a single 
Kafka partition.</p>
+        * <ul>
+        * <li>If the number of Kafka partitions is less than the number of 
sink instances, different
+        * sink instances will write to the same partition.</li>
+        * <li>If the number of Kafka partitions is higher than the number of 
sink instance, some
+        * Kafka partitions won't receive data.</li>
+        * </ul>
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        */
+       public Kafka08JsonTableSink(String topic, Properties properties) {
+               super(topic, properties, new FlinkFixedPartitioner<>());
+       }
+
+       /**
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.8
+        * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
        }
 
        /**
-        * Creates {@link KafkaTableSink} for Kafka 0.8.
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.8
+        * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 890fc3a..d7bb683 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka08JsonTableSinkTest extends 
KafkaTableSinkTestBase {
        protected KafkaTableSink createTableSink(
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
-                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-               return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
-                       @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(
-                                       String topic,
-                                       Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema,
-                                       FlinkKafkaPartitioner<Row> partitioner) 
{
-
-                               return kafkaProducer;
-                       }
-               };
+                       FlinkKafkaPartitioner<Row> partitioner) {
+
+               return new Kafka08JsonTableSink(topic, properties, partitioner);
+       }
+
+       @Override
+       protected Class<? extends SerializationSchema<Row>> 
getSerializationSchemaClass() {
+               return JsonRowSerializationSchema.class;
        }
 
        @Override
-       @SuppressWarnings("unchecked")
-       protected SerializationSchema<Row> getSerializationSchema() {
-               return new JsonRowSerializationSchema(FIELD_NAMES);
+       protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+               return FlinkKafkaProducer08.class;
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index f65a02d..a4d2661 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@ import java.util.Properties;
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 
        /**
-        * Creates {@link KafkaTableSink} for Kafka 0.9 .
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.9
+        * topic with fixed partition assignment.
+        *
+        * <p>Each parallel TableSink instance will write its rows to a single 
Kafka partition.</p>
+        * <ul>
+        * <li>If the number of Kafka partitions is less than the number of 
sink instances, different
+        * sink instances will write to the same partition.</li>
+        * <li>If the number of Kafka partitions is higher than the number of 
sink instance, some
+        * Kafka partitions won't receive data.</li>
+        * </ul>
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        */
+       public Kafka09JsonTableSink(String topic, Properties properties) {
+               super(topic, properties, new FlinkFixedPartitioner<>());
+       }
+
+       /**
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.9
+        * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
        }
 
        /**
-        * Creates {@link KafkaTableSink} for Kafka 0.9 .
+        * Creates {@link KafkaTableSink} to write table rows as JSON-encoded 
records to a Kafka 0.9
+        * topic with custom partition assignment.
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index c52b4ca..58f2b05 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka09JsonTableSinkTest extends 
KafkaTableSinkTestBase {
        protected KafkaTableSink createTableSink(
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
-                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-               return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
-                       @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(
-                                       String topic,
-                                       Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema,
-                                       FlinkKafkaPartitioner<Row> partitioner) 
{
-
-                               return kafkaProducer;
-                       }
-               };
+                       FlinkKafkaPartitioner<Row> partitioner) {
+
+               return new Kafka09JsonTableSink(topic, properties, partitioner);
+       }
+
+       @Override
+       protected Class<? extends SerializationSchema<Row>> 
getSerializationSchemaClass() {
+               return JsonRowSerializationSchema.class;
        }
 
        @Override
-       @SuppressWarnings("unchecked")
-       protected SerializationSchema<Row> getSerializationSchema() {
-               return new JsonRowSerializationSchema(FIELD_NAMES);
+       protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
+               return FlinkKafkaProducer09.class;
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index f354dad..6665dbd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
@@ -42,7 +43,7 @@ public abstract class KafkaJsonTableSink extends 
KafkaTableSink {
        }
 
        @Override
-       protected SerializationSchema<Row> createSerializationSchema(String[] 
fieldNames) {
-               return new JsonRowSerializationSchema(fieldNames);
+       protected SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema) {
+               return new JsonRowSerializationSchema(rowSchema);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index cac71dc..f42827e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -77,10 +77,10 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
        /**
         * Create serialization schema for converting table rows into bytes.
         *
-        * @param fieldNames Field names in table rows.
+        * @param rowSchema the schema of the row to serialize.
         * @return Instance of serialization schema
         */
-       protected abstract SerializationSchema<Row> 
createSerializationSchema(String[] fieldNames);
+       protected abstract SerializationSchema<Row> 
createSerializationSchema(RowTypeInfo rowSchema);
 
        /**
         * Create a deep copy of this sink.
@@ -92,6 +92,8 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
        @Override
        public void emitDataStream(DataStream<Row> dataStream) {
                FlinkKafkaProducerBase<Row> kafkaProducer = 
createKafkaProducer(topic, properties, serializationSchema, partitioner);
+               // always enable flush on checkpoint to achieve at-least-once 
if query runs with checkpointing enabled.
+               kafkaProducer.setFlushOnCheckpoint(true);
                dataStream.addSink(kafkaProducer);
        }
 
@@ -116,7 +118,9 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
                copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, 
"fieldTypes");
                Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
                        "Number of provided field names and types does not 
match.");
-               copy.serializationSchema = 
createSerializationSchema(fieldNames);
+
+               RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
+               copy.serializationSchema = createSerializationSchema(rowSchema);
 
                return copy;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 5ece193..36d3137 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -43,10 +46,23 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
        /**
         * Creates a JSON serialization schema for the given fields and types.
         *
-        * @param fieldNames Names of JSON fields to parse.
+        * @param rowSchema The schema of the rows to encode.
         */
-       public JsonRowSerializationSchema(String[] fieldNames) {
-               this.fieldNames = Preconditions.checkNotNull(fieldNames);
+       public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
+
+               Preconditions.checkNotNull(rowSchema);
+               String[] fieldNames = rowSchema.getFieldNames();
+               TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
+
+               // check that no field is composite
+               for (int i = 0; i < fieldTypes.length; i++) {
+                       if (fieldTypes[i] instanceof CompositeType) {
+                               throw new 
IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with 
nested schema, " +
+                                       "but field '" + fieldNames[i] + "' is 
nested: " + fieldTypes[i].toString());
+                       }
+               }
+
+               this.fieldNames = fieldNames;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 43bde35..70140a6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.table.api.Types;
@@ -36,31 +37,34 @@ public class JsonRowSerializationSchemaTest {
 
        @Test
        public void testRowSerialization() throws IOException {
-               String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] { 
Types.INT(), Types.BOOLEAN(), Types.STRING() };
+               RowTypeInfo rowSchema = new RowTypeInfo(
+                       new TypeInformation[]{Types.INT(), Types.BOOLEAN(), 
Types.STRING()},
+                       new String[] {"f1", "f2", "f3"}
+               );
+
                Row row = new Row(3);
                row.setField(0, 1);
                row.setField(1, true);
                row.setField(2, "str");
 
-               Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, 
row);
+               Row resultRow = serializeAndDeserialize(rowSchema, row);
                assertEqualRows(row, resultRow);
        }
 
        @Test
        public void testSerializationOfTwoRows() throws IOException {
-               String[] fieldNames = new String[] {"f1", "f2", "f3"};
-               TypeInformation<Row> row = Types.ROW(
-                       fieldNames,
-                       new TypeInformation<?>[] { Types.INT(), 
Types.BOOLEAN(), Types.STRING() }
+               RowTypeInfo rowSchema = new RowTypeInfo(
+                       new TypeInformation[]{Types.INT(), Types.BOOLEAN(), 
Types.STRING()},
+                       new String[] {"f1", "f2", "f3"}
                );
+
                Row row1 = new Row(3);
                row1.setField(0, 1);
                row1.setField(1, true);
                row1.setField(2, "str");
 
-               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(row);
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(rowSchema);
 
                byte[] bytes = serializationSchema.serialize(row1);
                assertEqualRows(row1, deserializationSchema.deserialize(bytes));
@@ -79,19 +83,33 @@ public class JsonRowSerializationSchemaTest {
                new JsonRowSerializationSchema(null);
        }
 
+       @Test(expected = IllegalArgumentException.class)
+       public void testRejectNestedSchema() {
+               RowTypeInfo rowSchema = new RowTypeInfo(
+                       new TypeInformation[]{Types.INT(), Types.BOOLEAN(), 
Types.ROW(Types.INT(), Types.DOUBLE())},
+                       new String[] {"f1", "f2", "f3"}
+               );
+
+               new JsonRowSerializationSchema(rowSchema);
+       }
+
        @Test(expected = IllegalStateException.class)
        public void testSerializeRowWithInvalidNumberOfFields() {
-               String[] fieldNames = new String[] {"f1", "f2", "f3"};
+               RowTypeInfo rowSchema = new RowTypeInfo(
+                       new TypeInformation[]{Types.INT(), Types.BOOLEAN(), 
Types.STRING()},
+                       new String[] {"f1", "f2", "f3"}
+               );
+
                Row row = new Row(1);
                row.setField(0, 1);
 
-               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
                serializationSchema.serialize(row);
        }
 
-       private Row serializeAndDeserialize(String[] fieldNames, 
TypeInformation<?>[] fieldTypes, Row row) throws IOException {
-               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(fieldNames);
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes));
+       private Row serializeAndDeserialize(RowTypeInfo rowSchema, Row row) 
throws IOException {
+               JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(rowSchema);
 
                byte[] bytes = serializationSchema.serialize(row);
                return deserializationSchema.deserialize(bytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/50fba9aa/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 3138152..ac5259e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 
@@ -46,32 +45,27 @@ import static org.mockito.Mockito.verify;
 public abstract class KafkaTableSinkTestBase {
 
        private static final String TOPIC = "testTopic";
-       protected static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
+       private static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
        private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] { Types.INT(), Types.STRING() };
        private static final FlinkKafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
        private static final Properties PROPERTIES = createSinkProperties();
-       @SuppressWarnings("unchecked")
-       private final FlinkKafkaProducerBase<Row> producer = new 
FlinkKafkaProducerBase<Row>(
-               TOPIC, new 
KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, 
PARTITIONER) {
-
-               @Override
-               protected void flush() {}
-       };
 
-       @Test
        @SuppressWarnings("unchecked")
+       @Test
        public void testKafkaTableSink() throws Exception {
                DataStream dataStream = mock(DataStream.class);
 
                KafkaTableSink kafkaTableSink = spy(createTableSink());
                kafkaTableSink.emitDataStream(dataStream);
 
-               verify(dataStream).addSink(eq(producer));
+               // verify correct producer class
+               verify(dataStream).addSink(any(getProducerClass()));
 
+               // verify correctly configured producer
                verify(kafkaTableSink).createKafkaProducer(
                        eq(TOPIC),
                        eq(PROPERTIES),
-                       any(getSerializationSchema().getClass()),
+                       any(getSerializationSchemaClass()),
                        eq(PARTITIONER));
        }
 
@@ -86,13 +80,17 @@ public abstract class KafkaTableSinkTestBase {
                assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
        }
 
-       protected abstract KafkaTableSink createTableSink(String topic, 
Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
+       protected abstract KafkaTableSink createTableSink(
+               String topic,
+               Properties properties,
+               FlinkKafkaPartitioner<Row> partitioner);
+
+       protected abstract Class<? extends SerializationSchema<Row>> 
getSerializationSchemaClass();
 
-       protected abstract SerializationSchema<Row> getSerializationSchema();
+       protected abstract Class<? extends FlinkKafkaProducerBase> 
getProducerClass();
 
        private KafkaTableSink createTableSink() {
-               return createTableSink(TOPIC, PROPERTIES, PARTITIONER, 
producer);
+               return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
        }
 
        private static Properties createSinkProperties() {

Reply via email to