[
https://issues.apache.org/jira/browse/FLINK-21200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276941#comment-17276941
]
Timo Walther commented on FLINK-21200:
--------------------------------------
String can handle all CHAR types because we introduce a logical implicit cast.
We could do the same for MAP and MULTISET. But logically, MAP and MULTISET are
two different entities that have nothing in common. Since MAP is not part of
the SQL standard, we could allow implicit casting here. Not sure if Calcite
allows this though.
> User defined functions refuse to accept multi-set arguments without type hints
> ------------------------------------------------------------------------------
>
> Key: FLINK-21200
> URL: https://issues.apache.org/jira/browse/FLINK-21200
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.11.3, 1.12.2, 1.13.0
> Reporter: Caizhi Weng
> Priority: Major
>
> The
> [document|https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#type-inference]
> states that the default conversion Java class of the {{t MULTISET}} type
> should be {{java.util.Map<t, java.lang.Integer>}}. However user defined
> functions with this type of argument refuses to accept {{MULTISET}} as
> argument.
> To reproduce this bug, add the following test case to
> {{org.apache.flink.table.planner.runtime.batch.sql.agg.WindowAggregateITCase}}
> {code:scala}
> @Test
> def myTest(): Unit = {
> tEnv.executeSql("CREATE TEMPORARY FUNCTION myFun AS
> 'org.apache.flink.table.planner.GetMultisetValue'")
> checkResult(
> s"""
> |SELECT myFun(c, '10') FROM
> |(SELECT
> | TUMBLE_START(ts, INTERVAL '3' SECOND) AS win_start,
> | COLLECT(c) AS c
> |FROM Table3WithTimestamp
> |GROUP BY TUMBLE(ts, INTERVAL '3' SECOND))
> |""".stripMargin,
> Seq())
> }
> {code}
> and add this user defined function to {{org.apache.flink.table.planner}}
> package
> {code:java}
> package org.apache.flink.table.planner;
> import org.apache.flink.table.functions.ScalarFunction;
> import java.util.Map;
> public class GetMultisetValue extends ScalarFunction {
> public static Integer eval(Map<String, Integer> data, String key) {
> return data.getOrDefault(key, 0);
> }
> }
> {code}
> The following exception occurs when running the test:
> {code}
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> Invalid function call:
> default_catalog.default_database.myFun(MULTISET<STRING> NOT NULL, CHAR(2) NOT
> NULL)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:649)
> at
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.parseQuery(BatchTestBase.scala:295)
> at
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:137)
> at
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:104)
> at
> org.apache.flink.table.planner.runtime.batch.sql.agg.WindowAggregateITCase.myTest(WindowAggregateITCase.scala:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> Caused by: org.apache.flink.table.api.ValidationException: Invalid function
> call:
> default_catalog.default_database.myFun(MULTISET<STRING> NOT NULL, CHAR(2) NOT
> NULL)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:90)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
> ... 37 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> default_catalog.default_database.myFun(data => MAP<STRING, INT>, key =>
> STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:177)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:125)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:87)
> ... 57 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
> type at position 0. Data type MAP<STRING, INT> expected but MULTISET<STRING>
> NOT NULL passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:133)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:101)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:123)
> ... 58 more
> {code}
> This SQL and the user defined function only works with {{DataTypeHint}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)