Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6026#discussion_r189437087
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
---
@@ -81,7 +85,12 @@ public String explainSource() {
@Override
protected AvroRowDeserializationSchema getDeserializationSchema() {
- return new AvroRowDeserializationSchema(avroRecordClass);
+ return new AvroRowDeserializationSchema(avroRecordClass,
tableSchemaToReturnType(schema));
+ }
+
+ /** Converts the table schema into into the return type. */
+ private static RowTypeInfo tableSchemaToReturnType(TableSchema
tableSchema) {
--- End diff --
Extra function has been removed.
---