[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531307#comment-16531307
 ] 

ASF GitHub Bot commented on FLINK-9444:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6218#discussion_r199791671
  
    --- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
    @@ -201,71 +202,69 @@ private Object convert(Schema schema, 
TypeInformation<?> info, Object object) {
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
    -                                   return convertRecord(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
    +                                   return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
                                }
                                throw new IllegalStateException("IndexedRecord 
expected but was: " + object.getClass());
                        case ENUM:
                        case STRING:
                                return object.toString();
                        case ARRAY:
                                if (info instanceof BasicArrayTypeInfo) {
    -                                   final BasicArrayTypeInfo<?, ?> bati = 
(BasicArrayTypeInfo<?, ?>) info;
    -                                   final TypeInformation<?> elementInfo = 
bati.getComponentInfo();
    -                                   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
    +                                   final TypeInformation<?> elementInfo = 
((BasicArrayTypeInfo<?, ?>) info).getComponentInfo();
    +                                   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
                                } else {
    -                                   final ObjectArrayTypeInfo<?, ?> oati = 
(ObjectArrayTypeInfo<?, ?>) info;
    -                                   final TypeInformation<?> elementInfo = 
oati.getComponentInfo();
    -                                   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
    +                                   final TypeInformation<?> elementInfo = 
((ObjectArrayTypeInfo<?, ?>) info).getComponentInfo();
    +                                   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
                                }
                        case MAP:
    -                           final MapTypeInfo<?, ?> mti = (MapTypeInfo<?, 
?>) info;
    +                           final MapTypeInfo<?, ?> mapTypeInfo = 
(MapTypeInfo<?, ?>) info;
                                final Map<String, Object> convertedMap = new 
HashMap<>();
                                final Map<?, ?> map = (Map<?, ?>) object;
                                for (Map.Entry<?, ?> entry : map.entrySet()) {
                                        convertedMap.put(
                                                entry.getKey().toString(),
    -                                           convert(schema.getValueType(), 
mti.getValueTypeInfo(), entry.getValue()));
    +                                           
convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), 
entry.getValue()));
                                }
                                return convertedMap;
                        case UNION:
                                final List<Schema> types = schema.getTypes();
                                final int size = types.size();
                                final Schema actualSchema;
                                if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
    -                                   return convert(types.get(1), info, 
object);
    +                                   return convertAvroType(types.get(1), 
info, object);
                                } else if (size == 2 && types.get(1).getType() 
== Schema.Type.NULL) {
    -                                   return convert(types.get(0), info, 
object);
    +                                   return convertAvroType(types.get(0), 
info, object);
                                } else if (size == 1) {
    -                                   return convert(types.get(0), info, 
object);
    +                                   return convertAvroType(types.get(0), 
info, object);
                                } else {
                                        // generic type
                                        return object;
                                }
                        case FIXED:
                                final byte[] fixedBytes = ((GenericFixed) 
object).bytes();
                                if (info == Types.BIG_DEC) {
    -                                   return convertDecimal(schema, 
fixedBytes);
    +                                   return convertToDecimal(schema, 
fixedBytes);
                                }
                                return fixedBytes;
                        case BYTES:
    -                           final ByteBuffer bb = (ByteBuffer) object;
    -                           bb.position(0);
    -                           final byte[] bytes = new byte[bb.remaining()];
    -                           bb.get(bytes);
    +                           final ByteBuffer byteBuffer = (ByteBuffer) 
object;
    +                           byteBuffer.position(0);
    --- End diff --
    
    I will remove it. The tests succeed in both cases.


> KafkaAvroTableSource failed to work for map and array fields
> ------------------------------------------------------------
>
>                 Key: FLINK-9444
>                 URL: https://issues.apache.org/jira/browse/FLINK-9444
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Table API &amp; SQL
>    Affects Versions: 1.6.0
>            Reporter: Jun Zhang
>            Assignee: Jun Zhang
>            Priority: Blocker
>              Labels: patch, pull-request-available
>             Fix For: 1.6.0
>
>         Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map<String, Integer> of table field 'event' does not match with type 
> GenericType<java.util.Map> of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to