[
https://issues.apache.org/jira/browse/FLINK-21962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-21962:
-----------------------------------
Labels: auto-unassigned stale-major (was: auto-unassigned)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> SQL Group Windows do not work on Flink 1.11
> --------------------------------------------
>
> Key: FLINK-21962
> URL: https://issues.apache.org/jira/browse/FLINK-21962
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.3
> Reporter: Barak Ben-Nathan
> Priority: Major
> Labels: auto-unassigned, stale-major
> Fix For: 1.14.0
>
>
> I am running this on Flink ver.1.11.3:
>
> {code:java}
> val bsEnv: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
> val bsSettings: EnvironmentSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv: StreamTableEnvironment =
> StreamTableEnvironment.create(bsEnv, bsSettings)
> val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
> bsTableEnv.registerCatalog("custom_catalog", cat)
> bsTableEnv.useCatalog("custom_catalog")
> bsTableEnv.useDatabase("customDB")
> val createInputTableDDL =
> """
> |
> |CREATE TABLE kafkaTable (
> | user_id BIGINT,
> | item_id BIGINT,
> | category_id BIGINT,
> | behavior STRING,
> | timestamp1 TIMESTAMP(3),
> | WATERMARK FOR timestamp1 AS timestamp1
> |) WITH (
> | 'connector' = 'kafka',
> | 'topic' = 'test-in',
> | 'properties.bootstrap.servers' = 'localhost:9092',
> | 'properties.group.id' = 'testGroup',
> | 'format' = 'json',
> | 'scan.startup.mode' = 'earliest-offset'
> |)
> |""".stripMargin
> val createOutputTableDDL =
> """
> |CREATE TABLE kafkaOutTable (
> | strt TIMESTAMP(3),
> | sum_ijh BIGINT
> |) WITH (
> | 'connector' = 'kafka',
> | 'topic' = 'test-out',
> | 'properties.bootstrap.servers' = 'localhost:9092',
> | 'properties.group.id' = 'testGroup',
> | 'format' = 'json',
> | 'scan.startup.mode' = 'earliest-offset'
> |)
> |""".stripMargin
> bsTableEnv.executeSql(createInputTableDDL)
> bsTableEnv.executeSql(createOutputTableDDL)
> val result = bsTableEnv.sqlQuery(
> "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2'
> MINUTE) as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
> "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
> )
> result.executeInsert("kafkaOutTable")
> {code}
> When setting log level to TRACE, the job fails with this exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to
> org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread
> "main" java.lang.ClassCastException:
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to
> org.apache.flink.table.planner.plan.cost.FlinkCostFactory at
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
> at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source) at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source) at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source)
> at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown
> Source) at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249)
> at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at
> org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.immutable.Range.foreach(Range.scala:158) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}
> It might be related to this:
> https://issues.apache.org/jira/browse/FLINK-15333
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)