[
https://issues.apache.org/jira/browse/FLINK-39715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18082919#comment-18082919
]
Arvind Kandpal commented on FLINK-39715:
----------------------------------------
Hi [~thaddywu], Please review my patch , whenever you have time.
> [Table/Planner] IndexOutOfBoundsException in FlinkExpandConversionRule for
> ORDER BY followed by global aggregate in batch mode
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39715
> URL: https://issues.apache.org/jira/browse/FLINK-39715
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0.1, 2.2.1
> Environment: - Batch mode
> - PyFlink Table API
> - parallelism.default = 1
> Reporter: Yaoxuan Wu
> Priority: Major
> Labels: pull-request-available
>
> In Flink batch mode, executing a global aggregate, such as MAX, MIN, COUNT,
> or AVG, on a table that was previously sorted with ORDER BY can cause the
> query planner to crash with an IndexOutOfBoundsException inside
> FlinkExpandConversionRule.
>
> Minimal reproducer:
> {code:java}
> from pyflink.table import TableEnvironment, EnvironmentSettings
> from pyflink.table import expressions as T
> from pyflink.table.types import DataTypes
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> t_env.get_config().set('parallelism.default', '1')
> src = t_env.from_elements(
> [[1, 'x']],
> DataTypes.ROW([
> DataTypes.FIELD('a', DataTypes.INT()),
> DataTypes.FIELD('b', DataTypes.STRING()),
> ])
> )
> # ORDER BY b, then global MAX(a).
> # The sort key refers to input field index 1, while the aggregate output has
> width 1.
> src.order_by(T.col('b').asc).select(T.col('a').max).execute().collect() {code}
> Expected behavior:
> The query should execute successfully and return:
> {code:java}
> [Row(1)] {code}
>
> Actual behavior:
> The planner crashes with an IndexOutOfBoundsException while applying
> FlinkExpandConversionRule.
>
> Relevant stack trace excerpt:
> {code:java}
> java.lang.RuntimeException: Error while applying rule
> FlinkExpandConversionRule,
> args [rel#266:AbstractConverter.BATCH_PHYSICAL.single.[1](...),
>
> rel#263:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[1](...,select=Partial_MAX(a)
> AS max$0)]
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
>
> at
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
>
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:524)
>
> ...
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(...)
>
> ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule
> FlinkExpandConversionRule
> at
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
>
> at
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
> ...
> Caused by: java.lang.IndexOutOfBoundsException: index (1) must be less than
> size (1)
> at
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1372)
> at
> com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:47)
> at org.apache.calcite.util.Util$TransformingList.get(Util.java:2804)
> at
> org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:85)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
> at
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:398)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1292)
> ...{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)