Github user tragicjun commented on a diff in the pull request:
https://github.com/apache/flink/pull/6082#discussion_r193039834
--- Diff:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo<?> genericTypeInfo =
(GenericTypeInfo<?>) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+ } else if (genericTypeInfo.getTypeClass() == Map.class)
{
+ // avro map key is always string
+ return Types.MAP(Types.STRING,
+
convertPrimitiveType(schema.getValueType().getType()));
+ } else if (genericTypeInfo.getTypeClass() == List.class
&&
+ schema.getType() == Schema.Type.ARRAY) {
+ return
Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --
yes, org.apache.flink.table.api.Types doesn't support LIST, but
org.apache.flink.api.common.typeinfo.Types does. The Avro array type would be
converted to java List type. Can we add LIST in
org.apache.flink.table.api.Types to support Avro arrays?
---