[ 
https://issues.apache.org/jira/browse/FLINK-21962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21962:
-----------------------------------
    Labels: auto-unassigned stale-assigned  (was: auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 14, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it. If the "warning_label" label is not removed in 7 days, the 
issue will be automatically unassigned.


> SQL Group Windows do not work on Flink 1.11 
> --------------------------------------------
>
>                 Key: FLINK-21962
>                 URL: https://issues.apache.org/jira/browse/FLINK-21962
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.3
>            Reporter: Barak Ben-Nathan
>            Assignee: godfrey he
>            Priority: Major
>              Labels: auto-unassigned, stale-assigned
>             Fix For: 1.14.0
>
>
> I am running this on Flink ver.1.11.3: 
>  
> {code:java}
>     val bsEnv: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>     val bsSettings: EnvironmentSettings   = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv: StreamTableEnvironment = 
> StreamTableEnvironment.create(bsEnv, bsSettings)
>     val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
>     bsTableEnv.registerCatalog("custom_catalog", cat)
>     bsTableEnv.useCatalog("custom_catalog")
>     bsTableEnv.useDatabase("customDB")
>     val createInputTableDDL =
>       """
>         |
>         |CREATE TABLE kafkaTable (
>         | user_id BIGINT,
>         | item_id BIGINT,
>         | category_id BIGINT,
>         | behavior STRING,
>         | timestamp1 TIMESTAMP(3),
>         | WATERMARK FOR timestamp1 AS timestamp1
>         |) WITH (
>         | 'connector' = 'kafka',
>         | 'topic' = 'test-in',
>         | 'properties.bootstrap.servers' = 'localhost:9092',
>         | 'properties.group.id' = 'testGroup',
>         | 'format' = 'json',
>         | 'scan.startup.mode' = 'earliest-offset'
>         |)
>         |""".stripMargin
>     val createOutputTableDDL =
>       """
>         |CREATE TABLE kafkaOutTable (
>         | strt TIMESTAMP(3),
>         | sum_ijh BIGINT
>         |) WITH (
>         | 'connector' = 'kafka',
>         | 'topic' = 'test-out',
>         | 'properties.bootstrap.servers' = 'localhost:9092',
>         | 'properties.group.id' = 'testGroup',
>         | 'format' = 'json',
>         | 'scan.startup.mode' = 'earliest-offset'
>         |)
>         |""".stripMargin
>     bsTableEnv.executeSql(createInputTableDDL)
>     bsTableEnv.executeSql(createOutputTableDDL)
>     val result = bsTableEnv.sqlQuery(
>       "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' 
> MINUTE) as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
>         "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
>     )
>     result.executeInsert("kafkaOutTable")
> {code}
> When setting log level to TRACE, the job fails with this exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
> org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread 
> "main" java.lang.ClassCastException: 
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
> org.apache.flink.table.planner.plan.cost.FlinkCostFactory at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
>  at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
> Source) at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
> Source) at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) 
> at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) 
> at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) 
> at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249)
>  at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at 
> org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
>  at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
>  at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}
> It might be related to this:
>  https://issues.apache.org/jira/browse/FLINK-15333
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to