[
https://issues.apache.org/jira/browse/FLINK-20256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236287#comment-17236287
]
Timo Walther commented on FLINK-20256:
--------------------------------------
[~rmetzger] I will investigate that now and come back shortly. But map views
are more of an internal feature anyway. We recently updated the docs to mention
them in one sentence but without examples.
> UDAF type inference will fail if accumulator contains MapView with Pojo value
> type
> ----------------------------------------------------------------------------------
>
> Key: FLINK-20256
> URL: https://issues.apache.org/jira/browse/FLINK-20256
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Reporter: Caizhi Weng
> Assignee: Timo Walther
> Priority: Blocker
> Fix For: 1.12.0
>
>
> To reproduce this bug, add the following test to {{FunctionITCase.java}}.
> {code:java}
> public static class MyPojo implements Serializable {
> public String a;
> public int b;
> public MyPojo(String s) {
> this.a = s;
> this.b = s.length();
> }
> }
> public static class MyAcc implements Serializable {
> public MapView<String, MyPojo> view = new MapView<>();
> public MyAcc() {}
> public void add(String a, String b) {
> try {
> view.put(a, new MyPojo(b));
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
> }
> public static class TestUDAF extends AggregateFunction<String, MyAcc> {
> @Override
> public MyAcc createAccumulator() {
> return new MyAcc();
> }
> public void accumulate(MyAcc acc, String value) {
> if (value != null) {
> acc.add(value, value);
> }
> }
> @Override
> public String getValue(MyAcc acc) {
> return "test";
> }
> }
> @Test
> public void myTest() throws Exception {
> String ddl = "create function MyACC as '" + TestUDAF.class.getName() +
> "'";
> tEnv().executeSql(ddl).await();
> try (CloseableIterator<Row> it = tEnv().executeSql("SELECT
> MyACC('123')").collect()) {
> while (it.hasNext()) {
> System.out.println(it.next());
> }
> }
> }
> {code}
> And we'll get the following exception stack
> {code}
> java.lang.ClassCastException: org.apache.flink.table.types.AtomicDataType
> cannot be cast to org.apache.flink.table.types.KeyValueDataType
> at
> org.apache.flink.table.planner.typeutils.DataViewUtils$MapViewSpec.getKeyDataType(DataViewUtils.java:257)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1$$anonfun$22.apply(AggsHandlerCodeGenerator.scala:1231)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1$$anonfun$22.apply(AggsHandlerCodeGenerator.scala:1231)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$.org$apache$flink$table$planner$codegen$agg$AggsHandlerCodeGenerator$$addReusableDataViewSerializer(AggsHandlerCodeGenerator.scala:1294)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1.apply(AggsHandlerCodeGenerator.scala:1228)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1.apply(AggsHandlerCodeGenerator.scala:1211)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$.addReusableStateDataViews(AggsHandlerCodeGenerator.scala:1211)
> at
> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.<init>(ImperativeAggCodeGen.scala:112)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:233)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:214)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.initialAggregateInformation(AggsHandlerCodeGenerator.scala:214)
> at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:325)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:143)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:52)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:52)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:163)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:83)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:49)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:49)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:78)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:77)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translate(StreamPlanner.scala:69)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1261)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:702)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1065)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:664)
> at
> org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.myTest(FunctionITCase.java:197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}
> However {{MapView<String, String>}} will be alright.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)