Ahmad Humayun created FLINK-38397:
-------------------------------------

             Summary: Runtime Exception in Planner
                 Key: FLINK-38397
                 URL: https://issues.apache.org/jira/browse/FLINK-38397
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.0.0
         Environment:  
{code:java}
python 3.10.12
apache-flink 2.0.0
{code}
 
            Reporter: Ahmad Humayun


 

The following results in the planner throwing a Runtime exception. My best 
guess is that it seems to be an issue with the planner/optimizer incorrectly 
estimating the number of columns available at a certain point.
{code:java}
from pyflink.table import EnvironmentSettings, TableEnvironment, Table
from pyflink.common import Configuration
from pyflink.table.expressions import col
 
cfg = Configuration()
 
settings = (
  EnvironmentSettings.new_instance()
  .in_batch_mode()
  .with_configuration(cfg)
  .build()
)
 
table_env = TableEnvironment.create(settings)
 
# =============
data = [
   (1, "AAAAAAAABAAAAAAA", "Jimmy Allen", "3rd", -5.00),
   (2, "AAAAAAAACAAAAAAA", "Jimmy Bullock", "Cedar Spruce", -5.00),
   (3, "AAAAAAAACAAAAAAA", "Floyd Christian", "8th", -5.00),
   (4, "AAAAAAAAEAAAAAAA", "James Lachance", "6th", -5.00),
   (5, "AAAAAAAAEAAAAAAA", "James Lachance", "9th 12th", -5.00),
   (6, "AAAAAAAAEAAAAAAA", "Joaquin Washington", "Adams", -5.00),
   (7, "AAAAAAAAHAAAAAAA", "Michael Burton", "3rd", -5.00)
]
 
schema = [
   "s_store_sk",
   "s_store_id",
   "s_manager",
   "s_street_name",
   "s_gmt_offset"
]
# =====================
 
source_table = table_env.from_elements(
  data,
  schema=schema
)
 
ordered = source_table.order_by(col('s_manager'))
aggregated = 
ordered.group_by(col('s_street_name')).select(col('s_gmt_offset').count.alias('s_gmt_offset'))
print(aggregated.explain())
{code}
This code throws the following error:
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o70.explain.: 
java.lang.RuntimeException: Error while applying rule 
FlinkExpandConversionRule, args 
[rel#219:AbstractConverter.BATCH_PHYSICAL.hash[0]true.[2](input=RelSubset#217,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=hash[0]true,sort=[2]),
 
rel#216:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[2](input=RelSubset#215,groupBy=s_street_name,select=s_street_name,
 Partial_COUNT(s_gmt_offset) AS count$0)]        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
        at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)        
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)      
  at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
       at scala.collection.Iterator.foreach(Iterator.scala:943)        at 
scala.collection.Iterator.foreach$(Iterator.scala:943)        at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)        at 
scala.collection.IterableLike.foreach(IterableLike.scala:74)        at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73)        at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56)        at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)        at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)        at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
        at scala.collection.immutable.List.foreach(List.scala:431)        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:625)
        at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:149)
        at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:49)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:753)
        at 
org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)       
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)        
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:840)Caused by: 
java.lang.RuntimeException: Error occurred while applying rule 
FlinkExpandConversionRule        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
        at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:273)     
   at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:288)     
   at 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
        at 
org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
        ... 41 moreCaused by: java.lang.ArrayIndexOutOfBoundsException: Index 2 
out of bounds for length 2        at 
org.apache.flink.calcite.shaded.com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:77)
        at org.apache.calcite.util.Util$TransformingList.get(Util.java:2794)    
    at scala.collection.convert.Wrappers$JListWrapper.apply(Wrappers.scala:100) 
       at 
org.apache.flink.table.planner.plan.utils.RelExplainUtil$.$anonfun$collationToString$1(RelExplainUtil.scala:83)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)      
  at scala.collection.Iterator.foreach(Iterator.scala:943)        at 
scala.collection.Iterator.foreach$(Iterator.scala:943)        at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)        at 
scala.collection.IterableLike.foreach(IterableLike.scala:74)        at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73)        at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56)        at 
scala.collection.TraversableLike.map(TraversableLike.scala:286)        at 
scala.collection.TraversableLike.map$(TraversableLike.scala:279)        at 
scala.collection.AbstractTraversable.map(Traversable.scala:108)        at 
org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:83)
        at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
        at 
org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:414) 
       at 
org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:396)   
     at 
org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:448)
        at java.base/java.util.HashMap.hash(HashMap.java:338)        at 
java.base/java.util.HashMap.getNode(HashMap.java:568)        at 
java.base/java.util.HashMap.get(HashMap.java:556)        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1289)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:598)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
        ... 46 more// code placeholder
{code}


However, if I just remove the columns that are irrelevant to the query, it 
works without issues i.e., just replace the data and schema with the following:

 
{code:java}
data = [
   ("Jimmy Allen", "3rd", -5.00),
   ("Jimmy Bullock", "Cedar Spruce", -5.00),
   ("Floyd Christian", "8th", -5.00),
   ("James Lachance", "6th", -5.00),
   ("James Lachance", "9th 12th", -5.00),
   ("Joaquin Washington", "Adams", -5.00),
   ("Michael Burton", "3rd", -5.00)
] 

schema = [
   "s_manager",
   "s_street_name",
   "s_gmt_offset"
]

{code}
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to