[
https://issues.apache.org/jira/browse/FLINK-38397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ahmad Humayun updated FLINK-38397:
----------------------------------
Summary: Stale Sort Trait causes BatchPhysicalSort.explainTerms to read a
non-existent index (was: Runtime Exception in Planner)
> Stale Sort Trait causes BatchPhysicalSort.explainTerms to read a non-existent
> index
> -----------------------------------------------------------------------------------
>
> Key: FLINK-38397
> URL: https://issues.apache.org/jira/browse/FLINK-38397
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.0.0
> Environment: python 3.10.12
> apache-flink 2.0.0
> Reporter: Ahmad Humayun
> Priority: Major
>
>
> The following results in the planner throwing a Runtime exception. My best
> guess is that it seems to be an issue with the planner/optimizer incorrectly
> estimating the number of columns available at a certain point.
> {code:java}
> from pyflink.table import EnvironmentSettings, TableEnvironment, Table
> from pyflink.common import Configuration
> from pyflink.table.expressions import col
>
> cfg = Configuration()
>
> settings = (
> EnvironmentSettings.new_instance()
> .in_batch_mode()
> .with_configuration(cfg)
> .build()
> )
>
> table_env = TableEnvironment.create(settings)
>
> # =============
> data = [
> (1, "AAAAAAAABAAAAAAA", "Jimmy Allen", "3rd", -5.00),
> (2, "AAAAAAAACAAAAAAA", "Jimmy Bullock", "Cedar Spruce", -5.00),
> (3, "AAAAAAAACAAAAAAA", "Floyd Christian", "8th", -5.00),
> (4, "AAAAAAAAEAAAAAAA", "James Lachance", "6th", -5.00),
> (5, "AAAAAAAAEAAAAAAA", "James Lachance", "9th 12th", -5.00),
> (6, "AAAAAAAAEAAAAAAA", "Joaquin Washington", "Adams", -5.00),
> (7, "AAAAAAAAHAAAAAAA", "Michael Burton", "3rd", -5.00)
> ]
>
> schema = [
> "s_store_sk",
> "s_store_id",
> "s_manager",
> "s_street_name",
> "s_gmt_offset"
> ]
> # =====================
>
> source_table = table_env.from_elements(
> data,
> schema=schema
> )
>
> ordered = source_table.order_by(col('s_manager'))
> aggregated =
> ordered.group_by(col('s_street_name')).select(col('s_gmt_offset').count.alias('s_gmt_offset'))
> print(aggregated.explain())
> {code}
> This code throws the following error:
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o70.explain.:
> java.lang.RuntimeException: Error while applying rule
> FlinkExpandConversionRule, args
> [rel#219:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[2](input=RelSubset#217,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[2]),
>
> rel#216:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[2](input=RelSubset#215,groupBy=s_street_name,select=s_street_name,
> Partial_COUNT(s_gmt_offset) AS count$0)] at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
> at
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
> at scala.collection.immutable.List.foreach(List.scala:431) at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
> at
> org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
> at
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:840)Caused by:
> java.lang.RuntimeException: Error occurred while applying rule
> FlinkExpandConversionRule at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:273)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:288)
> at
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
> at
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
> ... 41 moreCaused by: java.lang.ArrayIndexOutOfBoundsException: Index
> 2 out of bounds for length 2 at
> org.apache.flink.calcite.shaded.com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:77)
> at org.apache.calcite.util.Util$TransformingList.get(Util.java:2794)
> at
> scala.collection.convert.Wrappers$JListWrapper.apply(Wrappers.scala:100)
> at
> org.apache.flink.table.planner.plan.utils.RelExplainUtil$.$anonfun$collationToString$1(RelExplainUtil.scala:83)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableLike.map(TraversableLike.scala:286) at
> scala.collection.TraversableLike.map$(TraversableLike.scala:279) at
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:83)
> at
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
> at
> org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:414)
> at
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:396)
> at
> org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:448)
> at java.base/java.util.HashMap.hash(HashMap.java:338) at
> java.base/java.util.HashMap.getNode(HashMap.java:568) at
> java.base/java.util.HashMap.get(HashMap.java:556) at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1289)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
> ... 46 more// code placeholder
> {code}
> However, if I just remove the columns that are irrelevant to the query, it
> works without issues i.e., just replace the data and schema with the
> following:
>
> {code:java}
> data = [
> ("Jimmy Allen", "3rd", -5.00),
> ("Jimmy Bullock", "Cedar Spruce", -5.00),
> ("Floyd Christian", "8th", -5.00),
> ("James Lachance", "6th", -5.00),
> ("James Lachance", "9th 12th", -5.00),
> ("Joaquin Washington", "Adams", -5.00),
> ("Michael Burton", "3rd", -5.00)
> ]
> schema = [
> "s_manager",
> "s_street_name",
> "s_gmt_offset"
> ]
> {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)