GitHub user tragicjun opened a pull request:

    https://github.com/apache/flink/pull/6082

    [FLINK-9444] KafkaAvroTableSource failed to work for map fields

    ## What is the purpose of the change
    
    Once some Avro schema has map fields, an exception will be thrown when 
registering the KafkaAvroTableSource, complaining like:
    
    ```
    Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Type Map<String, String> of table field 'event' does not match with type 
GenericType<java.util.Map> of the field 'event' of the TableSource return type.
    at 
org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
    at 
org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
    at 
org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at 
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
    at 
org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
    at 
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
    at 
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)
    ```
    
    This pull request adds a new unit test to expose the issue and then fixes 
it. 
    
    *Note: In this implementation, following Avro primitive value types are 
supported: string, int, long, float, double and boolean, which should cater for 
most use cases.*
    
    ## Brief change log
    
      - Add a new unit test "testHasMapFieldsAvroClass()" in 
KafkaAvroTableSourceTestBase
      - Add some logic in "AvroTestUtils.createFlatAvroSchema()" to create Avro 
MapSchema
      - Add some logic in "AvroRecordClassConverter.convertType()" to convert 
"GenericType<java.util.Map>"  into "MapTypeInfo" with matching value types.
    
    ## Verifying this change
    
    This change can be verified as follows:
    - Run the unit test "testHasMapFieldsAvroClass()"  added in 
KafkaAvroTableSourceTestBase by this fix.
    - The unit test would fail with similar exceptions thrown described above.
    - Merge this fix and run the unit test again, it should pass 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: ( no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tragicjun/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6082
    
----
commit 802e7e211b7bea6fd17b88a058591272f0fb215f
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-16T16:27:32Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit b731f98ff3ca920883bc3c9daebb599c25049c0d
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-17T03:00:58Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit f291a34debca992ea675b75ffdb4358dfbfa3b47
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-19T07:06:24Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit 61d2081ef7f8aa3669d9774da6149d4020d9581c
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-19T07:34:20Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit d3d1afb710858b8f3cce988541ef3e805bd75b03
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-23T15:01:10Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit 112873fd9cca097db7948d8454a3d66c5dd2b32f
Author: tragicjun <zhangjun2915@...>
Date:   2018-05-23T15:06:32Z

    Merge branch 'master' into master

commit e450e8b64c066339331e158e6d599b2599636d55
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-23T15:55:51Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit 33349a82b3547e09e845f3d1d844d80a0ed0c091
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-24T13:22:13Z

    revert FLINK-9384 changes

commit 8cc12a112ea673c3ec2794949b1a6ab63e855195
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-24T13:24:49Z

    [FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch

commit d91a475a328e051f4717ec8b95be7adff92a3913
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-26T08:15:17Z

    Sync with upstream

commit 84ac010f0480342fa5fdf912d7fb10ff1f444900
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-26T08:17:03Z

    Sync with upstream

commit 5940fcbf7988a898a3e961f65d34f5711c17a5c4
Author: jerryjzhang <zhangjun2915@...>
Date:   2018-05-26T12:29:05Z

    [FLINK-9444]KafkaAvroTableSource failed to work for map fields

----


---

Reply via email to