[
https://issues.apache.org/jira/browse/FLINK-21962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317600#comment-17317600
]
godfrey he edited comment on FLINK-21962 at 4/9/21, 3:04 AM:
-------------------------------------------------------------
Thanks [[email protected]] for reporting this.
The reason is the HepPlanner will calculate the cost of the optimized plan when
TRACE is enabled, while the cost factory in hep planner is
RelOptCostImpl.Factory, not the sub-class of FlinkCostFactory.
The solution is we should create HepPlanner with the cost factory in the
RelOptCluster. I created a pr to fix it, see
https://github.com/apache/flink/pull/14588, but there are some TPC-DS plan
changes, I need more time to verify whether the changes are accepted.
I think there is a minor issue, because TRACE level is rarely used. I change
the fix version to 1.14
was (Author: godfreyhe):
Thanks [[email protected]] for reporting this.
The reason is the HepPlanner will calculate the cost of the optimized plan when
TRACE is enabled, while the cost factory in hep planner is
RelOptCostImpl.Factory, not the sub-class of FlinkCostFactory.
The solution is we should create HepPlanner with the cost factory in the
RelOptCluster. I created a pr to fix it, see
https://github.com/apache/flink/pull/14588, but there are some TPC-DS plan
changes, I need more time to verify whether the changes is accepted.
I think there is a minor issue, because TRACE level is rarely used. I change
the fix version to 1.14
> SQL Group Windows do not work on Flink 1.11
> --------------------------------------------
>
> Key: FLINK-21962
> URL: https://issues.apache.org/jira/browse/FLINK-21962
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.3
> Reporter: Barak Ben-Nathan
> Assignee: godfrey he
> Priority: Major
> Fix For: 1.14.0
>
>
> I am running this on Flink ver.1.11.3:
>
> {code:java}
> val bsEnv: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
> val bsSettings: EnvironmentSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv: StreamTableEnvironment =
> StreamTableEnvironment.create(bsEnv, bsSettings)
> val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
> bsTableEnv.registerCatalog("custom_catalog", cat)
> bsTableEnv.useCatalog("custom_catalog")
> bsTableEnv.useDatabase("customDB")
> val createInputTableDDL =
> """
> |
> |CREATE TABLE kafkaTable (
> | user_id BIGINT,
> | item_id BIGINT,
> | category_id BIGINT,
> | behavior STRING,
> | timestamp1 TIMESTAMP(3),
> | WATERMARK FOR timestamp1 AS timestamp1
> |) WITH (
> | 'connector' = 'kafka',
> | 'topic' = 'test-in',
> | 'properties.bootstrap.servers' = 'localhost:9092',
> | 'properties.group.id' = 'testGroup',
> | 'format' = 'json',
> | 'scan.startup.mode' = 'earliest-offset'
> |)
> |""".stripMargin
> val createOutputTableDDL =
> """
> |CREATE TABLE kafkaOutTable (
> | strt TIMESTAMP(3),
> | sum_ijh BIGINT
> |) WITH (
> | 'connector' = 'kafka',
> | 'topic' = 'test-out',
> | 'properties.bootstrap.servers' = 'localhost:9092',
> | 'properties.group.id' = 'testGroup',
> | 'format' = 'json',
> | 'scan.startup.mode' = 'earliest-offset'
> |)
> |""".stripMargin
> bsTableEnv.executeSql(createInputTableDDL)
> bsTableEnv.executeSql(createOutputTableDDL)
> val result = bsTableEnv.sqlQuery(
> "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2'
> MINUTE) as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
> "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
> )
> result.executeInsert("kafkaOutTable")
> {code}
> When setting log level to TRACE, the job fails with this exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to
> org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread
> "main" java.lang.ClassCastException:
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to
> org.apache.flink.table.planner.plan.cost.FlinkCostFactory at
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source) at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source) at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249)
> at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at
> org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.immutable.Range.foreach(Range.scala:158) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}
> It might be related to this:
> https://issues.apache.org/jira/browse/FLINK-15333
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)