[
https://issues.apache.org/jira/browse/FLINK-28764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-28764:
-----------------------------------
Labels: pull-request-available stale-assigned (was: pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issue is assigned but has not
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a
comment updating the community on your progress. If this issue is waiting on
feedback, please consider this a reminder to the committer/reviewer. Flink is a
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone
else may work on it.
> Support more than 64 distinct aggregate function calls in one aggregate SQL
> query
> ---------------------------------------------------------------------------------
>
> Key: FLINK-28764
> URL: https://issues.apache.org/jira/browse/FLINK-28764
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.6, 1.14.5, 1.15.1
> Reporter: Wei Zhong
> Assignee: Wei Zhong
> Priority: Major
> Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> Currently Flink SQL does not support more than 64 distinct aggregate function
> calls in one aggregate SQL query. We encountered this problem while migrating
> batch jobs from spark to flink. The spark job has 79 distinct aggregate
> function calls in one aggregate SQL query.
> Reproduce code:
> {code:java}
> public class Test64Distinct {
> public static void main(String[] args) {
> TableEnvironment tableEnv =
> TableEnvironment.create(EnvironmentSettings.inBatchMode());
> tableEnv.executeSql("create table datagen_source(id BIGINT, val
> BIGINT) with " +
> "('connector'='datagen', 'number-of-rows'='1000')");
> tableEnv.executeSql("select " +
> "count(distinct val * 1), " +
> "count(distinct val * 2), " +
> "count(distinct val * 3), " +
> "count(distinct val * 4), " +
> "count(distinct val * 5), " +
> "count(distinct val * 6), " +
> "count(distinct val * 7), " +
> "count(distinct val * 8), " +
> "count(distinct val * 9), " +
> "count(distinct val * 10), " +
> "count(distinct val * 11), " +
> "count(distinct val * 12), " +
> "count(distinct val * 13), " +
> "count(distinct val * 14), " +
> "count(distinct val * 15), " +
> "count(distinct val * 16), " +
> "count(distinct val * 17), " +
> "count(distinct val * 18), " +
> "count(distinct val * 19), " +
> "count(distinct val * 20), " +
> "count(distinct val * 21), " +
> "count(distinct val * 22), " +
> "count(distinct val * 23), " +
> "count(distinct val * 24), " +
> "count(distinct val * 25), " +
> "count(distinct val * 26), " +
> "count(distinct val * 27), " +
> "count(distinct val * 28), " +
> "count(distinct val * 29), " +
> "count(distinct val * 30), " +
> "count(distinct val * 31), " +
> "count(distinct val * 32), " +
> "count(distinct val * 33), " +
> "count(distinct val * 34), " +
> "count(distinct val * 35), " +
> "count(distinct val * 36), " +
> "count(distinct val * 37), " +
> "count(distinct val * 38), " +
> "count(distinct val * 39), " +
> "count(distinct val * 40), " +
> "count(distinct val * 41), " +
> "count(distinct val * 42), " +
> "count(distinct val * 43), " +
> "count(distinct val * 44), " +
> "count(distinct val * 45), " +
> "count(distinct val * 46), " +
> "count(distinct val * 47), " +
> "count(distinct val * 48), " +
> "count(distinct val * 49), " +
> "count(distinct val * 50), " +
> "count(distinct val * 51), " +
> "count(distinct val * 52), " +
> "count(distinct val * 53), " +
> "count(distinct val * 54), " +
> "count(distinct val * 55), " +
> "count(distinct val * 56), " +
> "count(distinct val * 57), " +
> "count(distinct val * 58), " +
> "count(distinct val * 59), " +
> "count(distinct val * 60), " +
> "count(distinct val * 61), " +
> "count(distinct val * 62), " +
> "count(distinct val * 63), " +
> "count(distinct val * 64), " +
> "count(distinct val * 65) from datagen_source").print();
> }
> } {code}
> Exception:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Sql
> optimization: Cannot generate a valid execution plan for the given query:
> LogicalSink(table=[*anonymous_collect$1*], fields=[EXPR$0, EXPR$1, EXPR$2,
> EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, EXPR$8, EXPR$9, EXPR$10, EXPR$11,
> EXPR$12, EXPR$13, EXPR$14, EXPR$15, EXPR$16, EXPR$17, EXPR$18, EXPR$19,
> EXPR$20, EXPR$21, EXPR$22, EXPR$23, EXPR$24, EXPR$25, EXPR$26, EXPR$27,
> EXPR$28, EXPR$29, EXPR$30, EXPR$31, EXPR$32, EXPR$33, EXPR$34, EXPR$35,
> EXPR$36, EXPR$37, EXPR$38, EXPR$39, EXPR$40, EXPR$41, EXPR$42, EXPR$43,
> EXPR$44, EXPR$45, EXPR$46, EXPR$47, EXPR$48, EXPR$49, EXPR$50, EXPR$51,
> EXPR$52, EXPR$53, EXPR$54, EXPR$55, EXPR$56, EXPR$57, EXPR$58, EXPR$59,
> EXPR$60, EXPR$61, EXPR$62, EXPR$63, EXPR$64])
> +- LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)],
> EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $2)],
> EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $4)],
> EXPR$5=[COUNT(DISTINCT $5)], EXPR$6=[COUNT(DISTINCT $6)],
> EXPR$7=[COUNT(DISTINCT $7)], EXPR$8=[COUNT(DISTINCT $8)],
> EXPR$9=[COUNT(DISTINCT $9)], EXPR$10=[COUNT(DISTINCT $10)],
> EXPR$11=[COUNT(DISTINCT $11)], EXPR$12=[COUNT(DISTINCT $12)],
> EXPR$13=[COUNT(DISTINCT $13)], EXPR$14=[COUNT(DISTINCT $14)],
> EXPR$15=[COUNT(DISTINCT $15)], EXPR$16=[COUNT(DISTINCT $16)],
> EXPR$17=[COUNT(DISTINCT $17)], EXPR$18=[COUNT(DISTINCT $18)],
> EXPR$19=[COUNT(DISTINCT $19)], EXPR$20=[COUNT(DISTINCT $20)],
> EXPR$21=[COUNT(DISTINCT $21)], EXPR$22=[COUNT(DISTINCT $22)],
> EXPR$23=[COUNT(DISTINCT $23)], EXPR$24=[COUNT(DISTINCT $24)],
> EXPR$25=[COUNT(DISTINCT $25)], EXPR$26=[COUNT(DISTINCT $26)],
> EXPR$27=[COUNT(DISTINCT $27)], EXPR$28=[COUNT(DISTINCT $28)],
> EXPR$29=[COUNT(DISTINCT $29)], EXPR$30=[COUNT(DISTINCT $30)],
> EXPR$31=[COUNT(DISTINCT $31)], EXPR$32=[COUNT(DISTINCT $32)],
> EXPR$33=[COUNT(DISTINCT $33)], EXPR$34=[COUNT(DISTINCT $34)],
> EXPR$35=[COUNT(DISTINCT $35)], EXPR$36=[COUNT(DISTINCT $36)],
> EXPR$37=[COUNT(DISTINCT $37)], EXPR$38=[COUNT(DISTINCT $38)],
> EXPR$39=[COUNT(DISTINCT $39)], EXPR$40=[COUNT(DISTINCT $40)],
> EXPR$41=[COUNT(DISTINCT $41)], EXPR$42=[COUNT(DISTINCT $42)],
> EXPR$43=[COUNT(DISTINCT $43)], EXPR$44=[COUNT(DISTINCT $44)],
> EXPR$45=[COUNT(DISTINCT $45)], EXPR$46=[COUNT(DISTINCT $46)],
> EXPR$47=[COUNT(DISTINCT $47)], EXPR$48=[COUNT(DISTINCT $48)],
> EXPR$49=[COUNT(DISTINCT $49)], EXPR$50=[COUNT(DISTINCT $50)],
> EXPR$51=[COUNT(DISTINCT $51)], EXPR$52=[COUNT(DISTINCT $52)],
> EXPR$53=[COUNT(DISTINCT $53)], EXPR$54=[COUNT(DISTINCT $54)],
> EXPR$55=[COUNT(DISTINCT $55)], EXPR$56=[COUNT(DISTINCT $56)],
> EXPR$57=[COUNT(DISTINCT $57)], EXPR$58=[COUNT(DISTINCT $58)],
> EXPR$59=[COUNT(DISTINCT $59)], EXPR$60=[COUNT(DISTINCT $60)],
> EXPR$61=[COUNT(DISTINCT $61)], EXPR$62=[COUNT(DISTINCT $62)],
> EXPR$63=[COUNT(DISTINCT $63)], EXPR$64=[COUNT(DISTINCT $64)])
> +- LogicalProject(exprs=[[*($1, 1), *($1, 2), *($1, 3), *($1, 4), *($1,
> 5), *($1, 6), *($1, 7), *($1, 8), *($1, 9), *($1, 10), *($1, 11), *($1, 12),
> *($1, 13), *($1, 14), *($1, 15), *($1, 16), *($1, 17), *($1, 18), *($1, 19),
> *($1, 20), *($1, 21), *($1, 22), *($1, 23), *($1, 24), *($1, 25), *($1, 26),
> *($1, 27), *($1, 28), *($1, 29), *($1, 30), *($1, 31), *($1, 32), *($1, 33),
> *($1, 34), *($1, 35), *($1, 36), *($1, 37), *($1, 38), *($1, 39), *($1, 40),
> *($1, 41), *($1, 42), *($1, 43), *($1, 44), *($1, 45), *($1, 46), *($1, 47),
> *($1, 48), *($1, 49), *($1, 50), *($1, 51), *($1, 52), *($1, 53), *($1, 54),
> *($1, 55), *($1, 56), *($1, 57), *($1, 58), *($1, 59), *($1, 60), *($1, 61),
> *($1, 62), *($1, 63), *($1, 64), *($1, 65)]])
> +- LogicalTableScan(table=[[default_catalog, default_database,
> datagen_source]])group count must be less than 64.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:86)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:92)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:312)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:192)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1688)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:840)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1342)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:708)
> at com.shopee.di.Test64Distinct.main(Test64Distinct.java:11)
> Caused by: org.apache.flink.table.api.TableException: group count must be
> less than 64.
> at
> org.apache.flink.table.planner.plan.rules.logical.DecomposeGroupingSetsRule.onMatch(DecomposeGroupingSetsRule.scala:177)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
> at
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
> ... 27 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)