[
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983206#comment-15983206
]
ASF GitHub Bot commented on FLINK-3871:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3663#discussion_r113161806
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties,
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+ /**
+ * Creates a generic Kafka Avro {@link StreamTableSource} using a given
{@link SpecificRecord}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param record Avro specific record.
+ */
+ KafkaAvroTableSource(
+ String topic,
+ Properties properties,
+ Class<? extends SpecificRecordBase> record) {
+
+ super(
+ topic,
+ properties,
+ createDeserializationSchema(record),
+ createFieldNames(record),
+ createFieldTypes(record));
+ }
+
+ private static AvroRowDeserializationSchema
createDeserializationSchema(Class<? extends SpecificRecordBase> record) {
+ return new AvroRowDeserializationSchema(record);
+ }
+
+ /**
+ * Converts the extracted AvroTypeInfo into a RowTypeInfo nested
structure with deterministic field order.
+ * Replaces generic Utf8 with basic String type information.
+ */
+ private static TypeInformation<?>
convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
--- End diff --
Change this to
```
private static TypeInformation<Row>
convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema)
```
and factor out the recursive logic to a method
```
convertToTypeInfomation(TypeInformation<?> extracted, Schema schema)
```
?
> Add Kafka TableSource with Avro serialization
> ---------------------------------------------
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was
> code-generated by Avro depending on a schema. Flink treats these classes as
> regular POJOs. Hence, they are also natively supported by the Table API and
> SQL. Classes generated by Avro contain their Schema in a static field. The
> schema should be used to automatically derive field names and types. Hence,
> there is no additional information required than the name of the class.
> # GenericRecord Mode: In this case the user specifies an Avro Schema. The
> schema is used to deserialize the data into a GenericRecord which must be
> translated into possibly nested {{Row}} based on the schema information.
> Again, the Avro Schema is used to automatically derive the field names and
> types. This mode is less efficient than the SpecificRecord mode because the
> {{GenericRecord}} needs to be converted into {{Row}}.
> This feature depends on FLINK-5280, i.e., support for nested data in
> {{TableSource}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)