Ahmad Humayun created FLINK-38397:
-------------------------------------
Summary: Runtime Exception in Planner
Key: FLINK-38397
URL: https://issues.apache.org/jira/browse/FLINK-38397
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.0.0
Environment:
{code:java}
python 3.10.12
apache-flink 2.0.0
{code}
Reporter: Ahmad Humayun
The following results in the planner throwing a Runtime exception. My best
guess is that it seems to be an issue with the planner/optimizer incorrectly
estimating the number of columns available at a certain point.
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, Table
from pyflink.common import Configuration
from pyflink.table.expressions import col
cfg = Configuration()
settings = (
EnvironmentSettings.new_instance()
.in_batch_mode()
.with_configuration(cfg)
.build()
)
table_env = TableEnvironment.create(settings)
# =============
data = [
(1, "AAAAAAAABAAAAAAA", "Jimmy Allen", "3rd", -5.00),
(2, "AAAAAAAACAAAAAAA", "Jimmy Bullock", "Cedar Spruce", -5.00),
(3, "AAAAAAAACAAAAAAA", "Floyd Christian", "8th", -5.00),
(4, "AAAAAAAAEAAAAAAA", "James Lachance", "6th", -5.00),
(5, "AAAAAAAAEAAAAAAA", "James Lachance", "9th 12th", -5.00),
(6, "AAAAAAAAEAAAAAAA", "Joaquin Washington", "Adams", -5.00),
(7, "AAAAAAAAHAAAAAAA", "Michael Burton", "3rd", -5.00)
]
schema = [
"s_store_sk",
"s_store_id",
"s_manager",
"s_street_name",
"s_gmt_offset"
]
# =====================
source_table = table_env.from_elements(
data,
schema=schema
)
ordered = source_table.order_by(col('s_manager'))
aggregated =
ordered.group_by(col('s_street_name')).select(col('s_gmt_offset').count.alias('s_gmt_offset'))
print(aggregated.explain())
{code}
This code throws the following error:
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o70.explain.:
java.lang.RuntimeException: Error while applying rule
FlinkExpandConversionRule, args
[rel#219:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[2](input=RelSubset#217,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[2]),
rel#216:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[2](input=RelSubset#215,groupBy=s_street_name,select=s_street_name,
Partial_COUNT(s_gmt_offset) AS count$0)] at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
at
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943) at
scala.collection.Iterator.foreach$(Iterator.scala:943) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
scala.collection.IterableLike.foreach(IterableLike.scala:74) at
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:431) at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
at
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
at
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
at
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
at
org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569) at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:840)Caused by:
java.lang.RuntimeException: Error occurred while applying rule
FlinkExpandConversionRule at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:273)
at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:288)
at
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
at
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
... 41 moreCaused by: java.lang.ArrayIndexOutOfBoundsException: Index 2
out of bounds for length 2 at
org.apache.flink.calcite.shaded.com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:77)
at org.apache.calcite.util.Util$TransformingList.get(Util.java:2794)
at scala.collection.convert.Wrappers$JListWrapper.apply(Wrappers.scala:100)
at
org.apache.flink.table.planner.plan.utils.RelExplainUtil$.$anonfun$collationToString$1(RelExplainUtil.scala:83)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943) at
scala.collection.Iterator.foreach$(Iterator.scala:943) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
scala.collection.IterableLike.foreach(IterableLike.scala:74) at
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
scala.collection.TraversableLike.map(TraversableLike.scala:286) at
scala.collection.TraversableLike.map$(TraversableLike.scala:279) at
scala.collection.AbstractTraversable.map(Traversable.scala:108) at
org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:83)
at
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
at
org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:414)
at
org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:396)
at
org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:448)
at java.base/java.util.HashMap.hash(HashMap.java:338) at
java.base/java.util.HashMap.getNode(HashMap.java:568) at
java.base/java.util.HashMap.get(HashMap.java:556) at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1289)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 46 more// code placeholder
{code}
However, if I just remove the columns that are irrelevant to the query, it
works without issues i.e., just replace the data and schema with the following:
{code:java}
data = [
("Jimmy Allen", "3rd", -5.00),
("Jimmy Bullock", "Cedar Spruce", -5.00),
("Floyd Christian", "8th", -5.00),
("James Lachance", "6th", -5.00),
("James Lachance", "9th 12th", -5.00),
("Joaquin Washington", "Adams", -5.00),
("Michael Burton", "3rd", -5.00)
]
schema = [
"s_manager",
"s_street_name",
"s_gmt_offset"
]
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)