[
https://issues.apache.org/jira/browse/FLINK-4542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512187#comment-16512187
]
Sergey Nuyanzin commented on FLINK-4542:
----------------------------------------
Hello [~twalthr]
could you please clarify some questions here. I have already faced an issue
that most of the mentioned operations are not supported in Calcite => I made it
support within CALCITE-2355 (I guess it will be available since 1.17.0).
At the same time at least two operations available now: cardinality, element
e.g. such test passes e.g.
https://github.com/apache/flink/compare/master...snuyanzin:FLINK_4542
# Am I right that multisets are not supported in Flink SQL and it would be nice
to have it support (look at the example below)? I have not find any related
ticket for this in jira.
# As I understand Multiset in Flink extends from Map at least it works for
MapTypeInfo and MultisetTypeInfo. What about support this operations? Calcite
does not allow them for maps and SQL:2003 also says nothing about about. On the
other hand at least cardinality, element could work for maps
# Am I right that currently 2 functions + multisets could be implemented within
this ticket and all other stuff could be moved under FLINK-9134 as depending on
Calcite update because of CALCITE-2355
below there are some details about issue relating to multisets in Flink
I was able to add it into Table API however faced another issue with SQL: there
is no support of multisets in Flink e.g. simple query {code}select
multiset[1]{code} or {code}@Test
def testUnboundedElement1(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStateBackend(getStateBackend)
StreamITCase.clear
val sqlQuery = "SELECT MULTISET[1] FROM MyTable"
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
val expected = List(
"1,1", "10,1", "11,1", "12,1", "13,1", "14,1", "15,1", "16,1", "17,1",
"18,1", "19,1",
"2,1", "20,1", "21,1", "3,1", "4,1", "5,1", "6,1", "7,1", "8,1", "9,1")
assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}{code} fails like {noformat}org.apache.flink.table.api.ValidationException:
SQL validation failed. null
at
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
at
org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:610)
at
org.apache.flink.table.runtime.stream.sql.SqlITCase.testUnboundedElement1(SqlITCase.scala:303)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
at
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:212)
at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:68)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:508)
at
org.apache.calcite.sql.validate.CollectNamespace.validateImpl(CollectNamespace.java:68)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947)
at
org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
at
org.apache.calcite.sql.validate.CollectNamespace.getRowType(CollectNamespace.java:39)
at
org.apache.calcite.sql.validate.AbstractNamespace.getType(AbstractNamespace.java:126)
at
org.apache.calcite.sql.validate.CollectNamespace.getType(CollectNamespace.java:39)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1589)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:460)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3969)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3232)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:928)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:903)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613)
at
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
... 30 more
{noformat}
> Add MULTISET operations
> -----------------------
>
> Key: FLINK-4542
> URL: https://issues.apache.org/jira/browse/FLINK-4542
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Sergey Nuyanzin
> Priority: Minor
>
> Umbrella issue for MULTISET operations like:
> MULTISET UNION, MULTISET UNION ALL, MULTISET EXCEPT, MULTISET EXCEPT ALL,
> MULTISET INTERSECT, MULTISET INTERSECT ALL, CARDINALITY, ELEMENT, MEMBER OF,
> SUBMULTISET OF, IS A SET, FUSION
> At the moment we only support COLLECT.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)