[FLINK-3871] [table] Add Kafka TableSource with Avro serialization This closes #3663.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbc5e29c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbc5e29c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbc5e29c Branch: refs/heads/master Commit: bbc5e29c8df71950c6216cf490817ef002c140c5 Parents: 1829819 Author: twalthr <[email protected]> Authored: Mon Apr 3 14:44:46 2017 +0200 Committer: twalthr <[email protected]> Committed: Wed Apr 26 16:42:01 2017 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 85 ++++++++-- .../flink-connector-kafka-0.10/pom.xml | 1 - .../kafka/Kafka010AvroTableSource.java | 56 +++++++ .../kafka/Kafka010JsonTableSource.java | 28 +--- .../connectors/kafka/Kafka010TableSource.java | 28 +--- .../kafka/Kafka010AvroTableSourceTest.java | 50 ++++++ .../kafka/Kafka010JsonTableSourceTest.java | 45 ++++++ .../kafka/Kafka08AvroTableSource.java | 56 +++++++ .../kafka/Kafka08JsonTableSource.java | 26 +--- .../connectors/kafka/Kafka08TableSource.java | 28 +--- .../kafka/Kafka08AvroTableSourceTest.java | 49 ++++++ .../kafka/Kafka08JsonTableSourceTest.java | 4 +- .../kafka/Kafka09AvroTableSource.java | 56 +++++++ .../kafka/Kafka09JsonTableSource.java | 26 +--- .../connectors/kafka/Kafka09TableSource.java | 28 +--- .../kafka/Kafka09AvroTableSourceTest.java | 49 ++++++ .../kafka/Kafka09JsonTableSourceTest.java | 4 +- .../flink-connector-kafka-base/pom.xml | 27 +++- .../connectors/kafka/KafkaAvroTableSource.java | 106 +++++++++++++ .../connectors/kafka/KafkaJsonTableSource.java | 40 +---- .../connectors/kafka/KafkaTableSource.java | 49 +----- .../connectors/kafka/internals/TypeUtil.java | 38 ----- .../AvroRowDeserializationSchema.java | 155 +++++++++++++++++++ .../AvroRowSerializationSchema.java | 124 +++++++++++++++ .../JsonRowDeserializationSchema.java | 43 ++--- .../kafka/AvroRowDeSerializationSchemaTest.java | 120 ++++++++++++++ .../kafka/JsonRowDeserializationSchemaTest.java | 26 ++-- .../kafka/JsonRowSerializationSchemaTest.java | 16 +- .../kafka/KafkaTableSinkTestBase.java | 4 +- .../kafka/KafkaTableSourceTestBase.java | 38 ++++- .../kafka/testutils/AvroTestUtils.java | 150 ++++++++++++++++++ .../apache/flink/api/common/typeinfo/Types.java | 2 +- .../flink/api/java/typeutils/RowTypeInfo.java | 7 + .../org/apache/flink/table/api/Types.scala | 61 ++++++-- 34 files changed, 1291 insertions(+), 334 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 3d77b85..4b9e565 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -202,7 +202,7 @@ tableEnv.registerTableSource("Customers", custTS) A `TableSource` can provide access to data stored in various storage systems such as databases (MySQL, HBase, ...), file formats (CSV, Apache Parquet, Avro, ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...). -Currently, Flink provides the `CsvTableSource` to read CSV files and the `Kafka08JsonTableSource`/`Kafka09JsonTableSource` to read JSON objects from Kafka. +Currently, Flink provides the `CsvTableSource` to read CSV files and various `TableSources` to read JSON or Avro objects from Kafka. A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface. ### Available Table Sources @@ -210,7 +210,11 @@ A custom `TableSource` can be defined by implementing the `BatchTableSource` or | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description** | `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files. | `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data. +| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for Avro data. | `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data. +| `Kafka09AvroTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for Avro data. +| `Kafka010JsonTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 0.10 source for JSON data. +| `Kafka010AvroTableSource` | `flink-connector-kafka-0.10` | N | Y | A Kafka 0.10 source for Avro data. All sources that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency. @@ -218,22 +222,42 @@ All sources that come with the `flink-table` dependency can be directly used by To use the Kafka JSON source, you have to add the Kafka connector dependency to your project: - - `flink-connector-kafka-0.8` for Kafka 0.8, and - - `flink-connector-kafka-0.9` for Kafka 0.9, respectively. + - `flink-connector-kafka-0.8` for Kafka 0.8, + - `flink-connector-kafka-0.9` for Kafka 0.9, or + - `flink-connector-kafka-0.10` for Kafka 0.10, respectively. You can then create the source as follows (example for Kafka 0.8): - -```java -// The JSON field names and types -String[] fieldNames = new String[] { "id", "name", "score"}; -Class<?>[] fieldTypes = new Class<?>[] { Integer.class, String.class, Double.class }; +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// specify JSON field names and types +TypeInformation<Row> typeInfo = Types.ROW( + new String[] { "id", "name", "score" }, + new TypeInformation<?>[] { Types.INT(), Types.STRING(), Types.DOUBLE() } +); KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource( kafkaTopic, kafkaProperties, - fieldNames, - fieldTypes); -``` + typeInfo); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +// specify JSON field names and types +val typeInfo = Types.ROW( + Array("id", "name", "score"), + Array(Types.INT, Types.STRING, Types.DOUBLE) +) + +val kafkaTableSource = new Kafka08JsonTableSource( + kafkaTopic, + kafkaProperties, + typeInfo) +{% endhighlight %} +</div> +</div> By default, a missing JSON field does not fail the source. You can configure this via: @@ -249,6 +273,43 @@ tableEnvironment.registerTableSource("kafka-source", kafkaTableSource); Table result = tableEnvironment.ingest("kafka-source"); ``` +#### KafkaAvroTableSource + +The `KafkaAvroTableSource` allows you to read Avro's `SpecificRecord` objects from Kafka. + +To use the Kafka Avro source, you have to add the Kafka connector dependency to your project: + + - `flink-connector-kafka-0.8` for Kafka 0.8, + - `flink-connector-kafka-0.9` for Kafka 0.9, or + - `flink-connector-kafka-0.10` for Kafka 0.10, respectively. + +You can then create the source as follows (example for Kafka 0.8): +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// pass the generated Avro class to the TableSource +Class<? extends SpecificRecord> clazz = MyAvroType.class; + +KafkaAvroTableSource kafkaTableSource = new Kafka08AvroTableSource( + kafkaTopic, + kafkaProperties, + clazz); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +// pass the generated Avro class to the TableSource +val clazz = classOf[MyAvroType] + +val kafkaTableSource = new Kafka08AvroTableSource( + kafkaTopic, + kafkaProperties, + clazz) +{% endhighlight %} +</div> +</div> + #### CsvTableSource The `CsvTableSource` is already included in `flink-table` without additional dependecies. @@ -1684,6 +1745,8 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally, | `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` | | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` | | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` | +| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` | +| `Types.OBJECT_ARRAY` | `ARRAY` | e.g. `java.lang.Byte[]`| Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and array types (object or primitive arrays) can be fields of a row. http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml index e061178..ab60946 100644 --- a/flink-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -152,7 +152,6 @@ under the License. <scope>test</scope> </dependency> - </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java new file mode 100644 index 0000000..1b2abcc --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.10. + */ +public class Kafka010AvroTableSource extends KafkaAvroTableSource { + + /** + * Creates a Kafka 0.10 Avro {@link StreamTableSource} using a given {@link SpecificRecord}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + public Kafka010AvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + record); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java index bfae1a9..78ef28e 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -28,40 +28,22 @@ import java.util.Properties; /** * Kafka {@link StreamTableSource} for Kafka 0.10. */ -public class Kafka010JsonTableSource extends Kafka09JsonTableSource { +public class Kafka010JsonTableSource extends KafkaJsonTableSource { /** * Creates a Kafka 0.10 JSON {@link StreamTableSource}. * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ public Kafka010JsonTableSource( String topic, Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.10 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); + super(topic, properties, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java index e5254c0..03e9125 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -36,36 +36,16 @@ public class Kafka010TableSource extends Kafka09TableSource { * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ public Kafka010TableSource( String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.10 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + super(topic, properties, deserializationSchema, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java new file mode 100644 index 0000000..ed93725 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.types.Row; + +public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + + return new Kafka010AvroTableSource( + topic, + properties, + AvroSpecificRecord.class); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) AvroRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer010.class; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java new file mode 100644 index 0000000..55e8b9c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.types.Row; + +public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + return new Kafka010JsonTableSource(topic, properties, typeInfo); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) JsonRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer010.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java new file mode 100644 index 0000000..1a68c05 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.8. + */ +public class Kafka08AvroTableSource extends KafkaAvroTableSource { + + /** + * Creates a Kafka 0.8 Avro {@link StreamTableSource} using a given {@link SpecificRecord}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + public Kafka08AvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + record); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java index 27e7e6e..1555a3b 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java @@ -35,33 +35,15 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource { * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ public Kafka08JsonTableSource( String topic, Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.8 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka08JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); + super(topic, properties, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java index 19fd50d..e1e481c 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -36,36 +36,16 @@ public class Kafka08TableSource extends KafkaTableSource { * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ public Kafka08TableSource( String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.8 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka08TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + super(topic, properties, deserializationSchema, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java new file mode 100644 index 0000000..2dedecb --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.types.Row; + +public class Kafka08AvroTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + return new Kafka08AvroTableSource( + topic, + properties, + AvroSpecificRecord.class); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) AvroRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer08.class; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java index f9ef2ce..27faff4 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java @@ -27,8 +27,8 @@ import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchem public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase { @Override - protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { - return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo); + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + return new Kafka08JsonTableSource(topic, properties, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java new file mode 100644 index 0000000..9e1172b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.9. + */ +public class Kafka09AvroTableSource extends KafkaAvroTableSource { + + /** + * Creates a Kafka 0.9 Avro {@link StreamTableSource} using a given {@link SpecificRecord}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param record Avro specific record. + */ + public Kafka09AvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> record) { + + super( + topic, + properties, + record); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java index 5be09fb..26fffa5 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java @@ -35,33 +35,15 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource { * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ public Kafka09JsonTableSource( String topic, Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.9 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka09JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); + super(topic, properties, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java index e226074..c581332 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -36,36 +36,16 @@ public class Kafka09TableSource extends KafkaTableSource { * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ public Kafka09TableSource( String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.9 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka09TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + super(topic, properties, deserializationSchema, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java new file mode 100644 index 0000000..eff8264 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.types.Row; + +public class Kafka09AvroTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + + return new Kafka09AvroTableSource( + topic, + properties, + AvroSpecificRecord.class); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) AvroRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer09.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java index 10b9acc..35cd9ce 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java @@ -27,8 +27,8 @@ import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchem public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase { @Override - protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { - return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo); + protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) { + return new Kafka09JsonTableSource(topic, properties, typeInfo); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml index fa401bd..263eb9a 100644 --- a/flink-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -116,7 +116,6 @@ under the License. <scope>test</scope> </dependency> - <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> @@ -163,6 +162,14 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minikdc</artifactId> <version>${minikdc.version}</version> @@ -206,6 +213,24 @@ under the License. <inherited>true</inherited> <extensions>true</extensions> </plugin> + <!-- Add Avro generated classes for testing. --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>${project.basedir}/../flink-avro/src/test/java/org/apache/flink/api/io/avro/generated</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java new file mode 100644 index 0000000..b88fb83 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.List; +import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.AvroTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +/** + * A version-agnostic Kafka Avro {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaAvroTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param avroClass Avro specific record. + */ + KafkaAvroTableSource( + String topic, + Properties properties, + Class<? extends SpecificRecordBase> avroClass) { + + super( + topic, + properties, + createDeserializationSchema(avroClass), + convertToRowTypeInformation(avroClass)); + } + + private static AvroRowDeserializationSchema createDeserializationSchema(Class<? extends SpecificRecordBase> record) { + return new AvroRowDeserializationSchema(record); + } + + /** + * Converts the extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + @SuppressWarnings("unchecked") + private static <T extends SpecificRecordBase> TypeInformation<Row> convertToRowTypeInformation(Class<T> avroClass) { + final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(avroClass); + // determine schema to retrieve deterministic field order + final Schema schema = SpecificData.get().getSchema(avroClass); + return (TypeInformation<Row>) convertToTypeInformation(avroTypeInfo, schema); + } + + /** + * Recursively converts extracted AvroTypeInfo into a RowTypeInfo nested structure with deterministic field order. + * Replaces generic Utf8 with basic String type information. + */ + private static TypeInformation<?> convertToTypeInformation(TypeInformation<?> extracted, Schema schema) { + if (schema.getType() == Schema.Type.RECORD) { + final List<Schema.Field> fields = schema.getFields(); + final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>) extracted; + + final TypeInformation<?>[] types = new TypeInformation<?>[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToTypeInformation(avroTypeInfo.getTypeAt(field.name()), field.schema()); + names[i] = field.name(); + } + return new RowTypeInfo(types, names); + } else if (extracted instanceof GenericTypeInfo<?>) { + final GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) extracted; + if (genericTypeInfo.getTypeClass() == Utf8.class) { + return BasicTypeInfo.STRING_TYPE_INFO; + } + } + return extracted; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index 3cdad0f..460f948 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import java.util.Properties; +import org.apache.flink.types.Row; /** * A version-agnostic Kafka JSON {@link StreamTableSource}. @@ -40,33 +41,15 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource { * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ KafkaJsonTableSource( String topic, Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { - super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); - } - - /** - * Creates a generic Kafka JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - KafkaJsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + super(topic, properties, createDeserializationSchema(typeInfo), typeInfo); } /** @@ -81,17 +64,8 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource { deserializationSchema.setFailOnMissingField(failOnMissingField); } - private static JsonRowDeserializationSchema createDeserializationSchema( - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - return new JsonRowDeserializationSchema(fieldNames, fieldTypes); - } - - private static JsonRowDeserializationSchema createDeserializationSchema( - String[] fieldNames, - Class<?>[] fieldTypes) { + private static JsonRowDeserializationSchema createDeserializationSchema(TypeInformation<Row> typeInfo) { - return new JsonRowDeserializationSchema(fieldNames, fieldTypes); + return new JsonRowDeserializationSchema(typeInfo); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 506358d..029aa45 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import java.util.Properties; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; @@ -27,10 +27,6 @@ import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import java.util.Properties; - -import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; - /** * A version-agnostic Kafka {@link StreamTableSource}. * @@ -48,30 +44,8 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> { /** Deserialization schema to use for Kafka records. */ private final DeserializationSchema<Row> deserializationSchema; - /** Row field names. */ - private final String[] fieldNames; - - /** Row field types. */ - private final TypeInformation<?>[] fieldTypes; - - /** - * Creates a generic Kafka {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - KafkaTableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes)); - } + /** Type information describing the result type. */ + private final TypeInformation<Row> typeInfo; /** * Creates a generic Kafka {@link StreamTableSource}. @@ -79,24 +53,18 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> { * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. + * @param typeInfo Type information describing the result type. */ KafkaTableSource( String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { + TypeInformation<Row> typeInfo) { this.topic = Preconditions.checkNotNull(topic, "Topic"); this.properties = Preconditions.checkNotNull(properties, "Properties"); this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); - this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); - - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); + this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information"); } /** @@ -107,13 +75,12 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> { public DataStream<Row> getDataStream(StreamExecutionEnvironment env) { // Version-specific Kafka consumer FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema); - DataStream<Row> kafkaSource = env.addSource(kafkaConsumer); - return kafkaSource; + return env.addSource(kafkaConsumer); } @Override public TypeInformation<Row> getReturnType() { - return new RowTypeInfo(fieldTypes, fieldNames); + return typeInfo; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java deleted file mode 100644 index 7a41ade..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -public class TypeUtil { - private TypeUtil() {} - - /** - * Creates TypeInformation array for an array of Classes. - * @param fieldTypes classes to extract type information from - * @return type information - */ - public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) { - TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - return typeInfos; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java new file mode 100644 index 0000000..37241f5 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * + * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows. + * + * {@link Utf8} is converted to regular Java Strings. + */ +public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> { + + /** + * Schema for deterministic field order. + */ + private final Schema schema; + + /** + * Reader that deserializes byte array into a record. + */ + private final DatumReader<SpecificRecord> datumReader; + + /** + * Input stream to read message from. + */ + private final MutableByteArrayInputStream inputStream; + + /** + * Avro decoder that decodes binary data + */ + private final Decoder decoder; + + /** + * Record to deserialize byte array to. + */ + private SpecificRecord record; + + /** + * Creates a Avro deserialization schema for the given record. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumReader = new SpecificDatumReader<>(schema); + this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema); + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + // read record + try { + inputStream.setBuffer(message); + this.record = datumReader.read(record, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + + // convert to row + final Object row = convertToRow(schema, record); + return (Row) row; + } + + /** + * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type. + * Avro's {@link Utf8} fields are converted into regular Java strings. + */ + private static Object convertToRow(Schema schema, Object recordObj) { + if (recordObj instanceof GenericRecord) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final Row row = new Row(fields.size()); + final GenericRecord record = (GenericRecord) recordObj; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + row.setField(i, convertToRow(field.schema(), record.get(field.pos()))); + } + return row; + } else if (recordObj instanceof Utf8) { + return recordObj.toString(); + } else { + return recordObj; + } + } + + /** + * An extension of the ByteArrayInputStream that allows to change a buffer that should be + * read without creating a new ByteArrayInputStream instance. This allows to re-use the same + * InputStream instance, copying message to process, and creation of Decoder on every new message. + */ + private static final class MutableByteArrayInputStream extends ByteArrayInputStream { + /** + * Create MutableByteArrayInputStream + */ + public MutableByteArrayInputStream() { + super(new byte[0]); + } + + /** + * Set buffer that can be read via the InputStream interface and reset the input stream. + * This has the same effect as creating a new ByteArrayInputStream with a new buffer. + * + * @param buf the new buffer to read. + */ + public void setBuffer(byte[] buf) { + this.buf = buf; + this.pos = 0; + this.count = buf.length; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java new file mode 100644 index 0000000..8388ab5 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +/** + * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + */ +public class AvroRowSerializationSchema implements SerializationSchema<Row> { + + /** + * Avro serialization schema. + */ + private final Schema schema; + + /** + * Writer to serialize Avro record into a byte array. + */ + private final DatumWriter<GenericRecord> datumWriter; + + /** + * Output stream to serialize records into byte array. + */ + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + + /** + * Low-level class for serialization of Avro values. + */ + private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null); + + /** + * Creates a Avro serialization schema for the given schema. + * + * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row + */ + public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.schema = SpecificData.get().getSchema(recordClazz); + this.datumWriter = new SpecificDatumWriter<>(schema); + } + + @Override + @SuppressWarnings("unchecked") + public byte[] serialize(Row row) { + // convert to record + final Object record = convertToRecord(schema, row); + + // write + try { + arrayOutputStream.reset(); + datumWriter.write((GenericRecord) record, encoder); + encoder.flush(); + return arrayOutputStream.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize Row.", e); + } + } + + /** + * Converts a (nested) Flink Row into Avro's {@link GenericRecord}. + * Strings are converted into Avro's {@link Utf8} fields. + */ + private static Object convertToRecord(Schema schema, Object rowObj) { + if (rowObj instanceof Row) { + // records can be wrapped in a union + if (schema.getType() == Schema.Type.UNION) { + final List<Schema> types = schema.getTypes(); + if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) { + schema = types.get(1); + } + else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) { + schema = types.get(0); + } + else { + throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema); + } + } else if (schema.getType() != Schema.Type.RECORD) { + throw new RuntimeException("Record type for row type expected. But is: " + schema); + } + final List<Schema.Field> fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); + final Row row = (Row) rowObj; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + record.put(field.pos(), convertToRecord(field.schema(), row.getField(i))); + } + return record; + } else if (rowObj instanceof String) { + return new Utf8((String) rowObj); + } else { + return rowObj; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index b4b3341..be201fa 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -20,14 +20,12 @@ package org.apache.flink.streaming.util.serialization; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import java.io.IOException; - /** * Deserialization schema from JSON to {@link Row}. * @@ -38,6 +36,9 @@ import java.io.IOException; */ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> { + /** Type information describing the result type. */ + private final TypeInformation<Row> typeInfo; + /** Field names to parse. Indices match fieldTypes indices. */ private final String[] fieldNames; @@ -51,35 +52,17 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> private boolean failOnMissingField; /** - * Creates a JSON deserialization schema for the given fields and type classes. - * - * @param fieldNames Names of JSON fields to parse. - * @param fieldTypes Type classes to parse JSON fields as. - */ - public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) { - this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); - - this.fieldTypes = new TypeInformation[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { - this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]); - } - - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); - } - - /** * Creates a JSON deserialization schema for the given fields and types. * - * @param fieldNames Names of JSON fields to parse. - * @param fieldTypes Types to parse JSON fields as. + * @param typeInfo Type information describing the result type. The field names are used + * to parse the JSON file and so are the types. */ - public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); + public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information"); + this.typeInfo = typeInfo; - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); + this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames(); + this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes(); } @Override @@ -118,7 +101,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> @Override public TypeInformation<Row> getProducedType() { - return new RowTypeInfo(fieldTypes); + return typeInfo; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java new file mode 100644 index 0000000..e13968e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import java.io.IOException; +import org.apache.avro.specific.SpecificRecord; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils; +import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema; +import org.apache.flink.types.Row; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +/** + * Test for the Avro serialization and deserialization schema. + */ +public class AvroRowDeSerializationSchemaTest { + + @Test + public void testSerializeDeserializeSimpleRow() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializeSimpleRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + serializationSchema.serialize(testData.f2); + serializationSchema.serialize(testData.f2); + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testDeserializeRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializeDeserializeComplexRow() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testSerializeComplexRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + serializationSchema.serialize(testData.f2); + serializationSchema.serialize(testData.f2); + final byte[] bytes = serializationSchema.serialize(testData.f2); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } + + @Test + public void testDeserializeComplexRowSeveralTimes() throws IOException { + final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData(); + + final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0); + final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0); + + final byte[] bytes = serializationSchema.serialize(testData.f2); + deserializationSchema.deserialize(bytes); + deserializationSchema.deserialize(bytes); + final Row actual = deserializationSchema.deserialize(bytes); + + assertEquals(testData.f2, actual); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bbc5e29c/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java index 88f62f0..f03feeb 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.junit.Test; @@ -56,8 +58,11 @@ public class JsonRowDeserializationSchemaTest { byte[] serializedJson = objectMapper.writeValueAsBytes(root); JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + Types.ROW( new String[] { "id", "name", "bytes" }, - new Class<?>[] { Long.class, String.class, byte[].class }); + new TypeInformation<?>[] { Types.LONG(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()) } + ) + ); Row deserialized = deserializationSchema.deserialize(serializedJson); @@ -80,8 +85,11 @@ public class JsonRowDeserializationSchemaTest { byte[] serializedJson = objectMapper.writeValueAsBytes(root); JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + Types.ROW( new String[] { "name" }, - new Class<?>[] { String.class }); + new TypeInformation<?>[] { Types.STRING() } + ) + ); Row row = deserializationSchema.deserialize(serializedJson); @@ -105,17 +113,11 @@ public class JsonRowDeserializationSchemaTest { public void testNumberOfFieldNamesAndTypesMismatch() throws Exception { try { new JsonRowDeserializationSchema( + Types.ROW( new String[] { "one", "two", "three" }, - new Class<?>[] { Long.class }); - fail("Did not throw expected Exception"); - } catch (IllegalArgumentException ignored) { - // Expected - } - - try { - new JsonRowDeserializationSchema( - new String[] { "one" }, - new Class<?>[] { Long.class, String.class }); + new TypeInformation<?>[] { Types.LONG() } + ) + ); fail("Did not throw expected Exception"); } catch (IllegalArgumentException ignored) { // Expected
