[
https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983215#comment-15983215
]
ASF GitHub Bot commented on FLINK-3871:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3663#discussion_r113207375
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A version-agnostic Kafka Avro {@link StreamTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #getKafkaConsumer(String, Properties,
DeserializationSchema)}}.
+ */
+public abstract class KafkaAvroTableSource extends KafkaTableSource {
+
+ /**
+ * Creates a generic Kafka Avro {@link StreamTableSource} using a given
{@link SpecificRecord}.
+ *
+ * @param topic Kafka topic to consume.
+ * @param properties Properties for the Kafka consumer.
+ * @param record Avro specific record.
+ */
+ KafkaAvroTableSource(
+ String topic,
+ Properties properties,
+ Class<? extends SpecificRecordBase> record) {
+
+ super(
+ topic,
+ properties,
+ createDeserializationSchema(record),
+ createFieldNames(record),
+ createFieldTypes(record));
+ }
+
+ private static AvroRowDeserializationSchema
createDeserializationSchema(Class<? extends SpecificRecordBase> record) {
+ return new AvroRowDeserializationSchema(record);
+ }
+
+ /**
+ * Converts the extracted AvroTypeInfo into a RowTypeInfo nested
structure with deterministic field order.
+ * Replaces generic Utf8 with basic String type information.
+ */
+ private static TypeInformation<?>
convertToRowTypeInformation(TypeInformation<?> extracted, Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ final List<Schema.Field> fields = schema.getFields();
+ final AvroTypeInfo<?> avroTypeInfo = (AvroTypeInfo<?>)
extracted;
+
+ final TypeInformation<?>[] types = new
TypeInformation<?>[fields.size()];
+ final String[] names = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ final Schema.Field field = fields.get(i);
+ types[i] =
convertToRowTypeInformation(avroTypeInfo.getTypeAt(field.name()),
field.schema());
+ names[i] = field.name();
+ }
+ return new RowTypeInfo(types, names);
+ } else if (extracted instanceof GenericTypeInfo<?>) {
+ final GenericTypeInfo<?> genericTypeInfo =
(GenericTypeInfo<?>) extracted;
+ if (genericTypeInfo.getTypeClass() == Utf8.class) {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ }
+ return extracted;
+ }
+
+ private static <T extends SpecificRecordBase> TypeInformation<?>[]
createFieldTypes(Class<T> record) {
+ final AvroTypeInfo<T> avroTypeInfo = new AvroTypeInfo<>(record);
+ // determine schema to retrieve deterministic field order
+ final Schema schema = SpecificData.get().getSchema(record);
+ final RowTypeInfo row = (RowTypeInfo)
convertToRowTypeInformation(avroTypeInfo, schema);
+ final TypeInformation<?>[] types = new
TypeInformation<?>[row.getArity()];
+ for (int i = 0; i < row.getArity(); i++) {
+ types[i] = row.getTypeAt(i);
+ }
+ return types;
+ }
+
+ private static String[] createFieldNames(Class<? extends
SpecificRecord> record) {
--- End diff --
We could refactor `KafkaTableSource` to not require a `String[] fieldNames`
and a `TypeInformation[] fieldTypes` as constructor parameters but just a
`TypeInformation<Row> rowType`.
The field names are a leftover from the time when `TableSource` did not
publish the field names by the `TypeInformation`.
When we do that, we can remove this method.
> Add Kafka TableSource with Avro serialization
> ---------------------------------------------
>
> Key: FLINK-3871
> URL: https://issues.apache.org/jira/browse/FLINK-3871
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Ivan Mushketyk
>
> Add a Kafka TableSource which supports Avro serialized data.
> The KafkaAvroTableSource should support two modes:
> # SpecificRecord Mode: In this case the user specifies a class which was
> code-generated by Avro depending on a schema. Flink treats these classes as
> regular POJOs. Hence, they are also natively supported by the Table API and
> SQL. Classes generated by Avro contain their Schema in a static field. The
> schema should be used to automatically derive field names and types. Hence,
> there is no additional information required than the name of the class.
> # GenericRecord Mode: In this case the user specifies an Avro Schema. The
> schema is used to deserialize the data into a GenericRecord which must be
> translated into possibly nested {{Row}} based on the schema information.
> Again, the Avro Schema is used to automatically derive the field names and
> types. This mode is less efficient than the SpecificRecord mode because the
> {{GenericRecord}} needs to be converted into {{Row}}.
> This feature depends on FLINK-5280, i.e., support for nested data in
> {{TableSource}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)