[
https://issues.apache.org/jira/browse/FLINK-15333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17886280#comment-17886280
]
Jim Hughes commented on FLINK-15333:
------------------------------------
[~yiyutian] and I hit the issue recently and we learned a little bit as we did.
To cause this, we enabled Calcite's trace level logging with configuration like
this:
```
loggers = testlogger
logger.testlogger.name =org.apache.calcite.plan.RelOptPlanner
logger.testlogger.level = TRACE
logger.testlogger.appenderRefs = TestLogger
```
This causes Calcite to call `HepPlanner.dumpGraph` and that needs to calculate
the cost. There is a small issue with the CostFactory; once that is resolved,
a second issue with costs shows up. At various points in the optimization
process, there are Calcite / logical RelNodes and Flink / Physical nodes. The
two have different cost classes which cannot be combined.
The solutions are:
1. Use DEBUG logging.
2. To enable TRACE logging, we'd need to sort out the CostFactory and also
extend methods like FlinkCost.add to except the Calcite cost class.
> Using FlinkCostFactory rather than RelOptCostImpl in blink planner
> -------------------------------------------------------------------
>
> Key: FLINK-15333
> URL: https://issues.apache.org/jira/browse/FLINK-15333
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: Leonard Xu
> Assignee: godfrey he
> Priority: Major
> Fix For: 1.11.0
>
>
> When I test FLINK SQL in flink 1.10, I found an exception which is a bug and
> need to fix.
> {code:java}
> // Some comments here
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to
> org.apache.flink.table.planner.plan.cost.FlinkCostFactory
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
> at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source)
> at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:282)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:41)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:40)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:39)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:282)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:41)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:40)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:39)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:282)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:41)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost$$anonfun$getCumulativeCost$1.apply(FlinkRelMdCumulativeCost.scala:40)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:39)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:282)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:246)
> at
> org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1016)
> at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:154)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:223)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:641)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)