Yaoxuan Wu created FLINK-39715:
----------------------------------

             Summary: [Table/Planner] IndexOutOfBoundsException in 
FlinkExpandConversionRule for ORDER BY followed by global aggregate in batch 
mode
                 Key: FLINK-39715
                 URL: https://issues.apache.org/jira/browse/FLINK-39715
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.2.1, 2.0.1
         Environment: - Batch mode

- PyFlink Table API

- parallelism.default = 1
            Reporter: Yaoxuan Wu


In Flink batch mode, executing a global aggregate, such as MAX, MIN, COUNT, or 
AVG, on a table that was previously sorted with ORDER BY can cause the query 
planner to crash with an IndexOutOfBoundsException inside 
FlinkExpandConversionRule.

 

Minimal reproducer:
{code:java}
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table import expressions as T
from pyflink.table.types import DataTypes

t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
t_env.get_config().set('parallelism.default', '1')

src = t_env.from_elements(
    [[1, 'x']],
    DataTypes.ROW([
        DataTypes.FIELD('a', DataTypes.INT()),
        DataTypes.FIELD('b', DataTypes.STRING()),
    ])
)

# ORDER BY b, then global MAX(a).
# The sort key refers to input field index 1, while the aggregate output has 
width 1.
src.order_by(T.col('b').asc).select(T.col('a').max).execute().collect() {code}
Expected behavior:

The query should execute successfully and return:
{code:java}
[Row(1)] {code}
 

Actual behavior:

The planner crashes with an IndexOutOfBoundsException while applying 
FlinkExpandConversionRule.

 

Relevant stack trace excerpt:
{code:java}
java.lang.RuntimeException: Error while applying rule 
FlinkExpandConversionRule, 
  args [rel#266:AbstractConverter.BATCH_PHYSICAL.single.[1](...), 
        
rel#263:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[1](...,select=Partial_MAX(a)
 AS max$0)] 
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
 
    at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
 
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:524)
 
    ... 
    at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(...)
 
    ... 
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
FlinkExpandConversionRule 
    at 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
 
    at 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
    ...

Caused by: java.lang.IndexOutOfBoundsException: index (1) must be less than 
size (1)
    at 
com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1372)
    at 
com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:47)
    at org.apache.calcite.util.Util$TransformingList.get(Util.java:2804)
    at 
org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:85)
    at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
    at 
org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:398)
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1292)

    ...{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to