[FLINK-3871] [table] Add Kafka TableSource with Avro serialization

This closes #3663.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbc5e29c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbc5e29c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbc5e29c

Branch: refs/heads/master
Commit: bbc5e29c8df71950c6216cf490817ef002c140c5
Parents: 1829819
Author: twalthr <[email protected]>
Authored: Mon Apr 3 14:44:46 2017 +0200
Committer: twalthr <[email protected]>
Committed: Wed Apr 26 16:42:01 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  85 ++++++++--
 .../flink-connector-kafka-0.10/pom.xml          |   1 -
 .../kafka/Kafka010AvroTableSource.java          |  56 +++++++
 .../kafka/Kafka010JsonTableSource.java          |  28 +---
 .../connectors/kafka/Kafka010TableSource.java   |  28 +---
 .../kafka/Kafka010AvroTableSourceTest.java      |  50 ++++++
 .../kafka/Kafka010JsonTableSourceTest.java      |  45 ++++++
 .../kafka/Kafka08AvroTableSource.java           |  56 +++++++
 .../kafka/Kafka08JsonTableSource.java           |  26 +---
 .../connectors/kafka/Kafka08TableSource.java    |  28 +---
 .../kafka/Kafka08AvroTableSourceTest.java       |  49 ++++++
 .../kafka/Kafka08JsonTableSourceTest.java       |   4 +-
 .../kafka/Kafka09AvroTableSource.java           |  56 +++++++
 .../kafka/Kafka09JsonTableSource.java           |  26 +---
 .../connectors/kafka/Kafka09TableSource.java    |  28 +---
 .../kafka/Kafka09AvroTableSourceTest.java       |  49 ++++++
 .../kafka/Kafka09JsonTableSourceTest.java       |   4 +-
 .../flink-connector-kafka-base/pom.xml          |  27 +++-
 .../connectors/kafka/KafkaAvroTableSource.java  | 106 +++++++++++++
 .../connectors/kafka/KafkaJsonTableSource.java  |  40 +----
 .../connectors/kafka/KafkaTableSource.java      |  49 +-----
 .../connectors/kafka/internals/TypeUtil.java    |  38 -----
 .../AvroRowDeserializationSchema.java           | 155 +++++++++++++++++++
 .../AvroRowSerializationSchema.java             | 124 +++++++++++++++
 .../JsonRowDeserializationSchema.java           |  43 ++---
 .../kafka/AvroRowDeSerializationSchemaTest.java | 120 ++++++++++++++
 .../kafka/JsonRowDeserializationSchemaTest.java |  26 ++--
 .../kafka/JsonRowSerializationSchemaTest.java   |  16 +-
 .../kafka/KafkaTableSinkTestBase.java           |   4 +-
 .../kafka/KafkaTableSourceTestBase.java         |  38 ++++-
 .../kafka/testutils/AvroTestUtils.java          | 150 ++++++++++++++++++
 .../apache/flink/api/common/typeinfo/Types.java |   2 +-
 .../flink/api/java/typeutils/RowTypeInfo.java   |   7 +
 .../org/apache/flink/table/api/Types.scala      |  61 ++++++--
 34 files changed, 1291 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 3d77b85..4b9e565 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -202,7 +202,7 @@ tableEnv.registerTableSource("Customers", custTS)
 
 A `TableSource` can provide access to data stored in various storage systems 
such as databases (MySQL, HBase, ...), file formats (CSV, Apache Parquet, Avro, 
ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...).
 
-Currently, Flink provides the `CsvTableSource` to read CSV files and the 
`Kafka08JsonTableSource`/`Kafka09JsonTableSource` to read JSON objects from 
Kafka.
+Currently, Flink provides the `CsvTableSource` to read CSV files and various 
`TableSources` to read JSON or Avro objects from Kafka.
 A custom `TableSource` can be defined by implementing the `BatchTableSource` 
or `StreamTableSource` interface.
 
 ### Available Table Sources
@@ -210,7 +210,11 @@ A custom `TableSource` can be defined by implementing the 
`BatchTableSource` or
 | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | 
**Description**
 | `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files.
 | `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 
source for JSON data.
+| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 
source for Avro data.
 | `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 
source for JSON data.
+| `Kafka09AvroTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 
source for Avro data.
+| `Kafka010JsonTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 
0.10 source for JSON data.
+| `Kafka010AvroTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 
0.10 source for Avro data.
 
 All sources that come with the `flink-table` dependency can be directly used 
by your Table programs. For all other table sources, you have to add the 
respective dependency in addition to the `flink-table` dependency.
 
@@ -218,22 +222,42 @@ All sources that come with the `flink-table` dependency 
can be directly used by
 
 To use the Kafka JSON source, you have to add the Kafka connector dependency 
to your project:
 
-  - `flink-connector-kafka-0.8` for Kafka 0.8, and
-  - `flink-connector-kafka-0.9` for Kafka 0.9, respectively.
+  - `flink-connector-kafka-0.8` for Kafka 0.8,
+  - `flink-connector-kafka-0.9` for Kafka 0.9, or
+  - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
 
 You can then create the source as follows (example for Kafka 0.8):
-
-```java
-// The JSON field names and types
-String[] fieldNames =  new String[] { "id", "name", "score"};
-Class<?>[] fieldTypes = new Class<?>[] { Integer.class, String.class, 
Double.class };
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// specify JSON field names and types
+TypeInformation<Row> typeInfo = Types.ROW(
+  new String[] { "id", "name", "score" },
+  new TypeInformation<?>[] { Types.INT(), Types.STRING(), Types.DOUBLE() }
+);
 
 KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
     kafkaTopic,
     kafkaProperties,
-    fieldNames,
-    fieldTypes);
-```
+    typeInfo);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// specify JSON field names and types
+val typeInfo = Types.ROW(
+  Array("id", "name", "score"),
+  Array(Types.INT, Types.STRING, Types.DOUBLE)
+)
+
+val kafkaTableSource = new Kafka08JsonTableSource(
+    kafkaTopic,
+    kafkaProperties,
+    typeInfo)
+{% endhighlight %}
+</div>
+</div>
 
 By default, a missing JSON field does not fail the source. You can configure 
this via:
 
@@ -249,6 +273,43 @@ tableEnvironment.registerTableSource("kafka-source", 
kafkaTableSource);
 Table result = tableEnvironment.ingest("kafka-source");
 ```
 
+#### KafkaAvroTableSource
+
+The `KafkaAvroTableSource` allows you to read Avro's `SpecificRecord` objects 
from Kafka.
+
+To use the Kafka Avro source, you have to add the Kafka connector dependency 
to your project:
+
+  - `flink-connector-kafka-0.8` for Kafka 0.8,
+  - `flink-connector-kafka-0.9` for Kafka 0.9, or
+  - `flink-connector-kafka-0.10` for Kafka 0.10, respectively.
+
+You can then create the source as follows (example for Kafka 0.8):
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// pass the generated Avro class to the TableSource
+Class<? extends SpecificRecord> clazz = MyAvroType.class; 
+
+KafkaAvroTableSource kafkaTableSource = new Kafka08AvroTableSource(
+    kafkaTopic,
+    kafkaProperties,
+    clazz);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// pass the generated Avro class to the TableSource
+val clazz = classOf[MyAvroType]
+
+val kafkaTableSource = new Kafka08AvroTableSource(
+    kafkaTopic,
+    kafkaProperties,
+    clazz)
+{% endhighlight %}
+</div>
+</div>
+
 #### CsvTableSource
 
 The `CsvTableSource` is already included in `flink-table` without additional 
dependecies.
@@ -1684,6 +1745,8 @@ The Table API is built on top of Flink's DataSet and 
DataStream API. Internally,
 | `Types.TIMESTAMP`      | `TIMESTAMP(3)`              | `java.sql.Timestamp`  
 |
 | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH`    | `java.lang.Integer`   
 |
 | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long`      
 |
+| `Types.PRIMITIVE_ARRAY`| `ARRAY`                     | e.g. `int[]`          
 |
+| `Types.OBJECT_ARRAY`   | `ARRAY`                     | e.g. 
`java.lang.Byte[]`|
 
 
 Advanced types such as generic types, composite types (e.g. POJOs or Tuples), 
and array types (object or primitive arrays) can be fields of a row. 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index e061178..ab60946 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -152,7 +152,6 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
new file mode 100644
index 0000000..1b2abcc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.10.
+ */
+public class Kafka010AvroTableSource extends KafkaAvroTableSource {
+
+       /**
+        * Creates a Kafka 0.10 Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param record     Avro specific record.
+        */
+       public Kafka010AvroTableSource(
+               String topic,
+               Properties properties,
+               Class<? extends SpecificRecordBase> record) {
+
+               super(
+                       topic,
+                       properties,
+                       record);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index bfae1a9..78ef28e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -28,40 +28,22 @@ import java.util.Properties;
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */
-public class Kafka010JsonTableSource extends Kafka09JsonTableSource {
+public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 
        /**
         * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
         *
         * @param topic      Kafka topic to consume.
         * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
+        * @param typeInfo   Type information describing the result type. The 
field names are used
+        *                   to parse the JSON file and so are the types.
         */
        public Kafka010JsonTableSource(
                        String topic,
                        Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.10 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka010JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
+               super(topic, properties, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index e5254c0..03e9125 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -36,36 +36,16 @@ public class Kafka010TableSource extends Kafka09TableSource 
{
         * @param topic                 Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
+        * @param typeInfo              Type information describing the result 
type. The field names are used
+        *                              to parse the JSON file and so are the 
types.
         */
        public Kafka010TableSource(
                        String topic,
                        Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.10 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka010TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
+               super(topic, properties, deserializationSchema, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
new file mode 100644
index 0000000..ed93725
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+
+               return new Kafka010AvroTableSource(
+                       topic,
+                       properties,
+                       AvroSpecificRecord.class);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) AvroRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer010.class;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
new file mode 100644
index 0000000..55e8b9c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
+
+public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+               return new Kafka010JsonTableSource(topic, properties, typeInfo);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) JsonRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer010.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
new file mode 100644
index 0000000..1a68c05
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.8.
+ */
+public class Kafka08AvroTableSource extends KafkaAvroTableSource {
+
+       /**
+        * Creates a Kafka 0.8 Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param record     Avro specific record.
+        */
+       public Kafka08AvroTableSource(
+               String topic,
+               Properties properties,
+               Class<? extends SpecificRecordBase> record) {
+
+               super(
+                       topic,
+                       properties,
+                       record);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer08<>(topic, deserializationSchema, 
properties);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 27e7e6e..1555a3b 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -35,33 +35,15 @@ public class Kafka08JsonTableSource extends 
KafkaJsonTableSource {
         *
         * @param topic      Kafka topic to consume.
         * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
+        * @param typeInfo   Type information describing the result type. The 
field names are used
+        *                   to parse the JSON file and so are the types.
         */
        public Kafka08JsonTableSource(
                        String topic,
                        Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.8 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka08JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
+               super(topic, properties, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 19fd50d..e1e481c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -36,36 +36,16 @@ public class Kafka08TableSource extends KafkaTableSource {
         * @param topic                 Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
+        * @param typeInfo              Type information describing the result 
type. The field names are used
+        *                              to parse the JSON file and so are the 
types.
         */
        public Kafka08TableSource(
                        String topic,
                        Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.8 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka08TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
+               super(topic, properties, deserializationSchema, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
new file mode 100644
index 0000000..2dedecb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+public class Kafka08AvroTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+               return new Kafka08AvroTableSource(
+                       topic,
+                       properties,
+                       AvroSpecificRecord.class);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) AvroRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer08.class;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
index f9ef2ce..27faff4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -27,8 +27,8 @@ import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchem
 public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
-               return new Kafka08JsonTableSource(topic, properties, 
fieldNames, typeInfo);
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+               return new Kafka08JsonTableSource(topic, properties, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
new file mode 100644
index 0000000..9e1172b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.9.
+ */
+public class Kafka09AvroTableSource extends KafkaAvroTableSource {
+
+       /**
+        * Creates a Kafka 0.9 Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param record     Avro specific record.
+        */
+       public Kafka09AvroTableSource(
+               String topic,
+               Properties properties,
+               Class<? extends SpecificRecordBase> record) {
+
+               super(
+                       topic,
+                       properties,
+                       record);
+       }
+
+       @Override
+       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+               return new FlinkKafkaConsumer09<>(topic, deserializationSchema, 
properties);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index 5be09fb..26fffa5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -35,33 +35,15 @@ public class Kafka09JsonTableSource extends 
KafkaJsonTableSource {
         *
         * @param topic      Kafka topic to consume.
         * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
+        * @param typeInfo   Type information describing the result type. The 
field names are used
+        *                   to parse the JSON file and so are the types.
         */
        public Kafka09JsonTableSource(
                        String topic,
                        Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       public Kafka09JsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, fieldNames, fieldTypes);
+               super(topic, properties, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index e226074..c581332 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -36,36 +36,16 @@ public class Kafka09TableSource extends KafkaTableSource {
         * @param topic                 Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
+        * @param typeInfo              Type information describing the result 
type. The field names are used
+        *                              to parse the JSON file and so are the 
types.
         */
        public Kafka09TableSource(
                        String topic,
                        Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
-       }
-
-       /**
-        * Creates a Kafka 0.9 {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       public Kafka09TableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               super(topic, properties, deserializationSchema, fieldNames, 
fieldTypes);
+               super(topic, properties, deserializationSchema, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
new file mode 100644
index 0000000..eff8264
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+public class Kafka09AvroTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+
+               return new Kafka09AvroTableSource(
+                       topic,
+                       properties,
+                       AvroSpecificRecord.class);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) AvroRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer09.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
index 10b9acc..35cd9ce 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -27,8 +27,8 @@ import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchem
 public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
 
        @Override
-       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
-               return new Kafka09JsonTableSource(topic, properties, 
fieldNames, typeInfo);
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, TypeInformation<Row> typeInfo) {
+               return new Kafka09JsonTableSource(topic, properties, typeInfo);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml 
b/flink-connectors/flink-connector-kafka-base/pom.xml
index fa401bd..263eb9a 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -116,7 +116,6 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-
                <dependency>
                        <groupId>org.apache.curator</groupId>
                        <artifactId>curator-test</artifactId>
@@ -163,6 +162,14 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-minikdc</artifactId>
                        <version>${minikdc.version}</version>
@@ -206,6 +213,24 @@ under the License.
                                <inherited>true</inherited>
                                <extensions>true</extensions>
                        </plugin>
+                       <!-- Add Avro generated classes for testing. -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>${project.basedir}/../flink-avro/src/test/java/org/apache/flink/api/io/avro/generated</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
        

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
new file mode 100644
index 0000000..b88fb83
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+       /**
+        * Creates a generic Kafka Avro {@link StreamTableSource} using a given 
{@link SpecificRecord}.
+        *
+        * @param topic      Kafka topic to consume.
+        * @param properties Properties for the Kafka consumer.
+        * @param avroClass  Avro specific record.
+        */
+       KafkaAvroTableSource(
+               String topic,
+               Properties properties,
+               Class<? extends SpecificRecordBase> avroClass) {
+
+               super(
+                       topic,
+                       properties,
+                       createDeserializationSchema(avroClass),
+                       convertToRowTypeInformation(avroClass));
+       }
+
+       private static AvroRowDeserializationSchema 
createDeserializationSchema(Class<? extends SpecificRecordBase> record) {
+               return new AvroRowDeserializationSchema(record);
+       }
+
+       /**
+        * Converts the extracted AvroTypeInfo into a RowTypeInfo nested 
structure with deterministic field order.
+        * Replaces generic Utf8 with basic String type information.
+        */
+       @SuppressWarnings("unchecked")
+       private static <T extends SpecificRecordBase> TypeInformation<Row> 
convertToRowTypeInformation(Class<T> avroClass) {
+               final AvroTypeInfo<T> avroTypeInfo = new 
AvroTypeInfo<>(avroClass);
+               // determine schema to retrieve deterministic field order
+               final Schema schema = SpecificData.get().getSchema(avroClass);
+               return (TypeInformation<Row>) 
convertToTypeInformation(avroTypeInfo, schema);
+       }
+
+       /**
+        * Recursively converts extracted AvroTypeInfo into a RowTypeInfo 
nested structure with deterministic field order.
+        * Replaces generic Utf8 with basic String type information.
+        */
+       private static TypeInformation<?> 
convertToTypeInformation(TypeInformation<?> extracted, Schema schema) {
+               if (schema.getType() == Schema.Type.RECORD) {
+                       final List<Schema.Field> fields = schema.getFields();
+                       final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) 
extracted;
+
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[fields.size()];
+                       final String[] names = new String[fields.size()];
+                       for (int i = 0; i < fields.size(); i++) {
+                               final Schema.Field field = fields.get(i);
+                               types[i] = 
convertToTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema());
+                               names[i] = field.name();
+                       }
+                       return new RowTypeInfo(types, names);
+               } else if (extracted instanceof GenericTypeInfo<?>) {
+                       final GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) extracted;
+                       if (genericTypeInfo.getTypeClass() == Utf8.class) {
+                               return BasicTypeInfo.STRING_TYPE_INFO;
+                       }
+               }
+               return extracted;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 3cdad0f..460f948 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 
 import java.util.Properties;
+import org.apache.flink.types.Row;
 
 /**
  * A version-agnostic Kafka JSON {@link StreamTableSource}.
@@ -40,33 +41,15 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource {
         *
         * @param topic      Kafka topic to consume.
         * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
+        * @param typeInfo   Type information describing the result type. The 
field names are used
+        *                   to parse the JSON file and so are the types.
         */
        KafkaJsonTableSource(
                        String topic,
                        Properties properties,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
-               super(topic, properties, 
createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
-       }
-
-       /**
-        * Creates a generic Kafka JSON {@link StreamTableSource}.
-        *
-        * @param topic      Kafka topic to consume.
-        * @param properties Properties for the Kafka consumer.
-        * @param fieldNames Row field names.
-        * @param fieldTypes Row field types.
-        */
-       KafkaJsonTableSource(
-                       String topic,
-                       Properties properties,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               super(topic, properties, 
createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
+               super(topic, properties, createDeserializationSchema(typeInfo), 
typeInfo);
        }
 
        /**
@@ -81,17 +64,8 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource {
                deserializationSchema.setFailOnMissingField(failOnMissingField);
        }
 
-       private static JsonRowDeserializationSchema createDeserializationSchema(
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
-
-               return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
-       }
-
-       private static JsonRowDeserializationSchema createDeserializationSchema(
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
+       private static JsonRowDeserializationSchema 
createDeserializationSchema(TypeInformation<Row> typeInfo) {
 
-               return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
+               return new JsonRowDeserializationSchema(typeInfo);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 506358d..029aa45 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
@@ -27,10 +27,6 @@ import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Properties;
-
-import static 
org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
-
 /**
  * A version-agnostic Kafka {@link StreamTableSource}.
  *
@@ -48,30 +44,8 @@ public abstract class KafkaTableSource implements 
StreamTableSource<Row> {
        /** Deserialization schema to use for Kafka records. */
        private final DeserializationSchema<Row> deserializationSchema;
 
-       /** Row field names. */
-       private final String[] fieldNames;
-
-       /** Row field types. */
-       private final TypeInformation<?>[] fieldTypes;
-
-       /**
-        * Creates a generic Kafka {@link StreamTableSource}.
-        *
-        * @param topic                 Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
-        */
-       KafkaTableSource(
-                       String topic,
-                       Properties properties,
-                       DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       Class<?>[] fieldTypes) {
-
-               this(topic, properties, deserializationSchema, fieldNames, 
toTypeInfo(fieldTypes));
-       }
+       /** Type information describing the result type. */
+       private final TypeInformation<Row> typeInfo;
 
        /**
         * Creates a generic Kafka {@link StreamTableSource}.
@@ -79,24 +53,18 @@ public abstract class KafkaTableSource implements 
StreamTableSource<Row> {
         * @param topic                 Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @param fieldNames            Row field names.
-        * @param fieldTypes            Row field types.
+        * @param typeInfo              Type information describing the result 
type.
         */
        KafkaTableSource(
                        String topic,
                        Properties properties,
                        DeserializationSchema<Row> deserializationSchema,
-                       String[] fieldNames,
-                       TypeInformation<?>[] fieldTypes) {
+                       TypeInformation<Row> typeInfo) {
 
                this.topic = Preconditions.checkNotNull(topic, "Topic");
                this.properties = Preconditions.checkNotNull(properties, 
"Properties");
                this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
-               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
-               this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field 
types");
-
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                               "Number of provided field names and types does 
not match.");
+               this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information");
        }
 
        /**
@@ -107,13 +75,12 @@ public abstract class KafkaTableSource implements 
StreamTableSource<Row> {
        public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
                // Version-specific Kafka consumer
                FlinkKafkaConsumerBase<Row> kafkaConsumer = 
getKafkaConsumer(topic, properties, deserializationSchema);
-               DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
-               return kafkaSource;
+               return env.addSource(kafkaConsumer);
        }
 
        @Override
        public TypeInformation<Row> getReturnType() {
-               return new RowTypeInfo(fieldTypes, fieldNames);
+               return typeInfo;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
deleted file mode 100644
index 7a41ade..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class TypeUtil {
-       private TypeUtil() {}
-
-       /**
-        * Creates TypeInformation array for an array of Classes.
-        * @param fieldTypes classes to extract type information from
-        * @return type information
-        */
-       public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
-               TypeInformation<?>[] typeInfos = new 
TypeInformation[fieldTypes.length];
-               for (int i = 0; i < fieldTypes.length; i++) {
-                       typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
-               }
-               return typeInfos;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
new file mode 100644
index 0000000..37241f5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ *
+ * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<Row> {
+
+       /**
+        * Schema for deterministic field order.
+        */
+       private final Schema schema;
+
+       /**
+        * Reader that deserializes byte array into a record.
+        */
+       private final DatumReader<SpecificRecord> datumReader;
+
+       /**
+        * Input stream to read message from.
+        */
+       private final MutableByteArrayInputStream inputStream;
+
+       /**
+        * Avro decoder that decodes binary data
+        */
+       private final Decoder decoder;
+
+       /**
+        * Record to deserialize byte array to.
+        */
+       private SpecificRecord record;
+
+       /**
+        * Creates a Avro deserialization schema for the given record.
+        *
+        * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+        */
+       public AvroRowDeserializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
+               Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumReader = new SpecificDatumReader<>(schema);
+               this.record = (SpecificRecord) 
SpecificData.newInstance(recordClazz, schema);
+               this.inputStream = new MutableByteArrayInputStream();
+               this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+       }
+
+       @Override
+       public Row deserialize(byte[] message) throws IOException {
+               // read record
+               try {
+                       inputStream.setBuffer(message);
+                       this.record = datumReader.read(record, decoder);
+               } catch (IOException e) {
+                       throw new RuntimeException("Failed to deserialize 
Row.", e);
+               }
+
+               // convert to row
+               final Object row = convertToRow(schema, record);
+               return (Row) row;
+       }
+
+       /**
+        * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row 
type.
+        * Avro's {@link Utf8} fields are converted into regular Java strings.
+        */
+       private static Object convertToRow(Schema schema, Object recordObj) {
+               if (recordObj instanceof GenericRecord) {
+                       // records can be wrapped in a union
+                       if (schema.getType() == Schema.Type.UNION) {
+                               final List<Schema> types = schema.getTypes();
+                               if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+                                       schema = types.get(1);
+                               }
+                               else {
+                                       throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD]. Given: " + 
schema);
+                               }
+                       } else if (schema.getType() != Schema.Type.RECORD) {
+                               throw new RuntimeException("Record type for row 
type expected. But is: " + schema);
+                       }
+                       final List<Schema.Field> fields = schema.getFields();
+                       final Row row = new Row(fields.size());
+                       final GenericRecord record = (GenericRecord) recordObj;
+                       for (int i = 0; i < fields.size(); i++) {
+                               final Schema.Field field = fields.get(i);
+                               row.setField(i, convertToRow(field.schema(), 
record.get(field.pos())));
+                       }
+                       return row;
+               } else if (recordObj instanceof Utf8) {
+                       return recordObj.toString();
+               } else {
+                       return recordObj;
+               }
+       }
+
+       /**
+        * An extension of the ByteArrayInputStream that allows to change a 
buffer that should be
+        * read without creating a new ByteArrayInputStream instance. This 
allows to re-use the same
+        * InputStream instance, copying message to process, and creation of 
Decoder on every new message.
+        */
+       private static final class MutableByteArrayInputStream extends 
ByteArrayInputStream {
+               /**
+                * Create MutableByteArrayInputStream
+                */
+               public MutableByteArrayInputStream() {
+                       super(new byte[0]);
+               }
+
+               /**
+                * Set buffer that can be read via the InputStream interface 
and reset the input stream.
+                * This has the same effect as creating a new 
ByteArrayInputStream with a new buffer.
+                *
+                * @param buf the new buffer to read.
+                */
+               public void setBuffer(byte[] buf) {
+                       this.buf = buf;
+                       this.pos = 0;
+                       this.count = buf.length;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
new file mode 100644
index 0000000..8388ab5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements SerializationSchema<Row> {
+
+       /**
+        * Avro serialization schema.
+        */
+       private final Schema schema;
+
+       /**
+        * Writer to serialize Avro record into a byte array.
+        */
+       private final DatumWriter<GenericRecord> datumWriter;
+
+       /**
+        * Output stream to serialize records into byte array.
+        */
+       private final ByteArrayOutputStream arrayOutputStream =  new 
ByteArrayOutputStream();
+
+       /**
+        * Low-level class for serialization of Avro values.
+        */
+       private final Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+       /**
+        * Creates a Avro serialization schema for the given schema.
+        *
+        * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+        */
+       public AvroRowSerializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
+               Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+               this.schema = SpecificData.get().getSchema(recordClazz);
+               this.datumWriter = new SpecificDatumWriter<>(schema);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public byte[] serialize(Row row) {
+               // convert to record
+               final Object record = convertToRecord(schema, row);
+
+               // write
+               try {
+                       arrayOutputStream.reset();
+                       datumWriter.write((GenericRecord) record, encoder);
+                       encoder.flush();
+                       return arrayOutputStream.toByteArray();
+               } catch (IOException e) {
+                       throw new RuntimeException("Failed to serialize Row.", 
e);
+               }
+       }
+
+       /**
+        * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+        * Strings are converted into Avro's {@link Utf8} fields.
+        */
+       private static Object convertToRecord(Schema schema, Object rowObj) {
+               if (rowObj instanceof Row) {
+                       // records can be wrapped in a union
+                       if (schema.getType() == Schema.Type.UNION) {
+                               final List<Schema> types = schema.getTypes();
+                               if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+                                       schema = types.get(1);
+                               }
+                               else if (types.size() == 2 && 
types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == 
Schema.Type.NULL) {
+                                       schema = types.get(0);
+                               }
+                               else {
+                                       throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD] or 
UNION[RECORD, NULL] Given: " + schema);
+                               }
+                       } else if (schema.getType() != Schema.Type.RECORD) {
+                               throw new RuntimeException("Record type for row 
type expected. But is: " + schema);
+                       }
+                       final List<Schema.Field> fields = schema.getFields();
+                       final GenericRecord record = new 
GenericData.Record(schema);
+                       final Row row = (Row) rowObj;
+                       for (int i = 0; i < fields.size(); i++) {
+                               final Schema.Field field = fields.get(i);
+                               record.put(field.pos(), 
convertToRecord(field.schema(), row.getField(i)));
+                       }
+                       return record;
+               } else if (rowObj instanceof String) {
+                       return new Utf8((String) rowObj);
+               } else {
+                       return rowObj;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index b4b3341..be201fa 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -20,14 +20,12 @@ package org.apache.flink.streaming.util.serialization;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
-
 /**
  * Deserialization schema from JSON to {@link Row}.
  *
@@ -38,6 +36,9 @@ import java.io.IOException;
  */
 public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row> {
 
+       /** Type information describing the result type. */
+       private final TypeInformation<Row> typeInfo;
+
        /** Field names to parse. Indices match fieldTypes indices. */
        private final String[] fieldNames;
 
@@ -51,35 +52,17 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
        private boolean failOnMissingField;
 
        /**
-        * Creates a JSON deserialization schema for the given fields and type 
classes.
-        *
-        * @param fieldNames Names of JSON fields to parse.
-        * @param fieldTypes Type classes to parse JSON fields as.
-        */
-       public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] 
fieldTypes) {
-               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
-
-               this.fieldTypes = new TypeInformation[fieldTypes.length];
-               for (int i = 0; i < fieldTypes.length; i++) {
-                       this.fieldTypes[i] = 
TypeExtractor.getForClass(fieldTypes[i]);
-               }
-
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                               "Number of provided field names and types does 
not match.");
-       }
-
-       /**
         * Creates a JSON deserialization schema for the given fields and types.
         *
-        * @param fieldNames Names of JSON fields to parse.
-        * @param fieldTypes Types to parse JSON fields as.
+        * @param typeInfo   Type information describing the result type. The 
field names are used
+        *                   to parse the JSON file and so are the types.
         */
-       public JsonRowDeserializationSchema(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
-               this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field 
types");
+       public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
+               Preconditions.checkNotNull(typeInfo, "Type information");
+               this.typeInfo = typeInfo;
 
-               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
-                               "Number of provided field names and types does 
not match.");
+               this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
+               this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
        }
 
        @Override
@@ -118,7 +101,7 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
 
        @Override
        public TypeInformation<Row> getProducedType() {
-               return new RowTypeInfo(fieldTypes);
+               return typeInfo;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
new file mode 100644
index 0000000..e13968e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.io.IOException;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
+import org.apache.flink.types.Row;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * Test for the Avro serialization and deserialization schema.
+ */
+public class AvroRowDeSerializationSchemaTest {
+
+       @Test
+       public void testSerializeDeserializeSimpleRow() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializeSimpleRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               serializationSchema.serialize(testData.f2);
+               serializationSchema.serialize(testData.f2);
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testDeserializeRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               deserializationSchema.deserialize(bytes);
+               deserializationSchema.deserialize(bytes);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializeDeserializeComplexRow() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testSerializeComplexRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               serializationSchema.serialize(testData.f2);
+               serializationSchema.serialize(testData.f2);
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+
+       @Test
+       public void testDeserializeComplexRowSeveralTimes() throws IOException {
+               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
+
+               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
+               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
+
+               final byte[] bytes = serializationSchema.serialize(testData.f2);
+               deserializationSchema.deserialize(bytes);
+               deserializationSchema.deserialize(bytes);
+               final Row actual = deserializationSchema.deserialize(bytes);
+
+               assertEquals(testData.f2, actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
index 88f62f0..f03feeb 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.junit.Test;
@@ -56,8 +58,11 @@ public class JsonRowDeserializationSchemaTest {
                byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
                JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
+                       Types.ROW(
                                new String[] { "id", "name", "bytes" },
-                               new Class<?>[] { Long.class, String.class, 
byte[].class });
+                               new TypeInformation<?>[] { Types.LONG(), 
Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()) }
+                       )
+               );
 
                Row deserialized = 
deserializationSchema.deserialize(serializedJson);
 
@@ -80,8 +85,11 @@ public class JsonRowDeserializationSchemaTest {
                byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
                JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
+                       Types.ROW(
                                new String[] { "name" },
-                               new Class<?>[] { String.class });
+                               new TypeInformation<?>[] { Types.STRING() }
+                       )
+               );
 
                Row row = deserializationSchema.deserialize(serializedJson);
 
@@ -105,17 +113,11 @@ public class JsonRowDeserializationSchemaTest {
        public void testNumberOfFieldNamesAndTypesMismatch() throws Exception {
                try {
                        new JsonRowDeserializationSchema(
+                               Types.ROW(
                                        new String[] { "one", "two", "three" },
-                                       new Class<?>[] { Long.class });
-                       fail("Did not throw expected Exception");
-               } catch (IllegalArgumentException ignored) {
-                       // Expected
-               }
-
-               try {
-                       new JsonRowDeserializationSchema(
-                                       new String[] { "one" },
-                                       new Class<?>[] { Long.class, 
String.class });
+                                       new TypeInformation<?>[] { Types.LONG() 
}
+                               )
+                       );
                        fail("Did not throw expected Exception");
                } catch (IllegalArgumentException ignored) {
                        // Expected

Reply via email to