GitHub user tragicjun opened a pull request:
https://github.com/apache/flink/pull/6082
[FLINK-9444] KafkaAvroTableSource failed to work for map fields
## What is the purpose of the change
Once some Avro schema has map fields, an exception will be thrown when
registering the KafkaAvroTableSource, complaining like:
```
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type Map<String, String> of table field 'event' does not match with type
GenericType<java.util.Map> of the field 'event' of the TableSource return type.
at
org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)
```
This pull request adds a new unit test to expose the issue and then fixes
it.
*Note: In this implementation, following Avro primitive value types are
supported: string, int, long, float, double and boolean, which should cater for
most use cases.*
## Brief change log
- Add a new unit test "testHasMapFieldsAvroClass()" in
KafkaAvroTableSourceTestBase
- Add some logic in "AvroTestUtils.createFlatAvroSchema()" to create Avro
MapSchema
- Add some logic in "AvroRecordClassConverter.convertType()" to convert
"GenericType<java.util.Map>" into "MapTypeInfo" with matching value types.
## Verifying this change
This change can be verified as follows:
- Run the unit test "testHasMapFieldsAvroClass()" added in
KafkaAvroTableSourceTestBase by this fix.
- The unit test would fail with similar exceptions thrown described above.
- Merge this fix and run the unit test again, it should pass
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: ( no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tragicjun/flink master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6082.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6082
----
commit 802e7e211b7bea6fd17b88a058591272f0fb215f
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-16T16:27:32Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit b731f98ff3ca920883bc3c9daebb599c25049c0d
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-17T03:00:58Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit f291a34debca992ea675b75ffdb4358dfbfa3b47
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-19T07:06:24Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit 61d2081ef7f8aa3669d9774da6149d4020d9581c
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-19T07:34:20Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit d3d1afb710858b8f3cce988541ef3e805bd75b03
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-23T15:01:10Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit 112873fd9cca097db7948d8454a3d66c5dd2b32f
Author: tragicjun <zhangjun2915@...>
Date: 2018-05-23T15:06:32Z
Merge branch 'master' into master
commit e450e8b64c066339331e158e6d599b2599636d55
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-23T15:55:51Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit 33349a82b3547e09e845f3d1d844d80a0ed0c091
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-24T13:22:13Z
revert FLINK-9384 changes
commit 8cc12a112ea673c3ec2794949b1a6ab63e855195
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-24T13:24:49Z
[FLINK-9384]KafkaAvroTableSource failed to work due to type mismatch
commit d91a475a328e051f4717ec8b95be7adff92a3913
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-26T08:15:17Z
Sync with upstream
commit 84ac010f0480342fa5fdf912d7fb10ff1f444900
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-26T08:17:03Z
Sync with upstream
commit 5940fcbf7988a898a3e961f65d34f5711c17a5c4
Author: jerryjzhang <zhangjun2915@...>
Date: 2018-05-26T12:29:05Z
[FLINK-9444]KafkaAvroTableSource failed to work for map fields
----
---