[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525103#comment-16525103
 ] 

ASF GitHub Bot commented on FLINK-9444:
---------------------------------------

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/6218

    [FLINK-9444] [formats] Add full SQL support for Avro formats

    ## What is the purpose of the change
    
    This PR adds full support of Apache Avro records for the Table API & SQL. 
It adds (de)serialization schemas to the row type for both specific and generic 
records. It converts all Avro types to Flink types and vice versa. It supports 
both physical and logical Avro types. Both an Avro class or a Avro schema 
string can be used for format initialization.
    
    ## Brief change log
    
    - Rework of SerializationSchema and DeserializationSchema for Avro
    - Update old tests for new Avro types introduced with Avro 1.8 and code 
clean up
    
    ## Verifying this change
    
    - Reworked AvroRowDeSerializationTest
    - Added AvroSchemaConverterTest
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): yes
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-9444

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6218.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6218
    
----
commit 3a4c5e6b6313648e532307d59082f1671b0695d5
Author: Timo Walther <twalthr@...>
Date:   2018-06-26T09:46:06Z

    [FLINK-9444] [formats] Add full SQL support for Avro formats
    
    This PR adds full support of Apache Avro records for the Table API & SQL. 
It adds (de)serialization schemas to the row type for both specific and generic 
records. It converts all Avro types to Flink types and vice versa. It supports 
both physical and logical Avro types. Both an Avro class or a Avro schema 
string can be used for format initialization.

----


> KafkaAvroTableSource failed to work for map and array fields
> ------------------------------------------------------------
>
>                 Key: FLINK-9444
>                 URL: https://issues.apache.org/jira/browse/FLINK-9444
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Table API &amp; SQL
>    Affects Versions: 1.6.0
>            Reporter: Jun Zhang
>            Assignee: Jun Zhang
>            Priority: Blocker
>              Labels: patch, pull-request-available
>             Fix For: 1.6.0
>
>         Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map<String, Integer> of table field 'event' does not match with type 
> GenericType<java.util.Map> of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to