[ 
https://issues.apache.org/jira/browse/FLINK-21962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Barak Ben-Nathan updated FLINK-21962:
-------------------------------------
    Description: 
I am running this on Flink ver.1.11.3: 

 
{code:java}
    val bsEnv: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings: EnvironmentSettings   = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val bsTableEnv: StreamTableEnvironment = 
StreamTableEnvironment.create(bsEnv, bsSettings)

    val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
    bsTableEnv.registerCatalog("custom_catalog", cat)
    bsTableEnv.useCatalog("custom_catalog")
    bsTableEnv.useDatabase("customDB")

    val createInputTableDDL =
      """
        |
        |CREATE TABLE kafkaTable (
        | user_id BIGINT,
        | item_id BIGINT,
        | category_id BIGINT,
        | behavior STRING,
        | timestamp1 TIMESTAMP(3),
        | WATERMARK FOR timestamp1 AS timestamp1
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'test-in',
        | 'properties.bootstrap.servers' = 'localhost:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
        |""".stripMargin

    val createOutputTableDDL =
      """
        |CREATE TABLE kafkaOutTable (
        | strt TIMESTAMP(3),
        | sum_ijh BIGINT
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'test-out',
        | 'properties.bootstrap.servers' = 'localhost:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
        |""".stripMargin

    bsTableEnv.executeSql(createInputTableDDL)
    bsTableEnv.executeSql(createOutputTableDDL)

    val result = bsTableEnv.sqlQuery(
      "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE) 
as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
        "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
    )

    result.executeInsert("kafkaOutTable")
{code}
When setting log level to TRACE, the job fails with this exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: 
org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread 
"main" java.lang.ClassCastException: 
org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
org.apache.flink.table.planner.plan.cost.FlinkCostFactory at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
 at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
Source) at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) 
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249)
 at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at 
org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.immutable.Range.foreach(Range.scala:158) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
 at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) 
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}
It might be related to this:
 https://issues.apache.org/jira/browse/FLINK-15333
  

  was:
I am running this on Flink ver.1.11.3: 

 
{code:java}
    val bsEnv: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings: EnvironmentSettings   = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val bsTableEnv: StreamTableEnvironment = 
StreamTableEnvironment.create(bsEnv, bsSettings)

    val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
    bsTableEnv.registerCatalog("custom_catalog", cat)
    bsTableEnv.useCatalog("custom_catalog")
    bsTableEnv.useDatabase("customDB")

    val createInputTableDDL =
      """
        |
        |CREATE TABLE kafkaTable (
        | user_id BIGINT,
        | item_id BIGINT,
        | category_id BIGINT,
        | behavior STRING,
        | timestamp1 TIMESTAMP(3),
        | WATERMARK FOR timestamp1 AS timestamp1
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'test-in',
        | 'properties.bootstrap.servers' = 'localhost:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
        |""".stripMargin

    val createOutputTableDDL =
      """
        |CREATE TABLE kafkaOutTable (
        | strt TIMESTAMP(3),
        | sum_ijh BIGINT
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'test-out',
        | 'properties.bootstrap.servers' = 'localhost:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
        |""".stripMargin

    bsTableEnv.executeSql(createInputTableDDL)
    bsTableEnv.executeSql(createOutputTableDDL)

    val result = bsTableEnv.sqlQuery(
      "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE) 
as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
        "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
    )

    result.executeInsert("kafkaOutTable")
{code}

The job keeps running (locally), it does not stop, but It doesn't work: no 
output.

When setting log level to TRACE, the job fails with this exception:


{code:java}

Exception in thread "main" java.lang.ClassCastException: 
org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread 
"main" java.lang.ClassCastException: 
org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
org.apache.flink.table.planner.plan.cost.FlinkCostFactory at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
 at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
Source) at 
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) 
at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at 
GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
 at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at 
org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249)
 at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at 
org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.immutable.Range.foreach(Range.scala:158) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) 
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) 
at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
 at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) 
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}

When I upgrade to version 1.12.2. The problem vanishes, there is output.

It might be related to this:
https://issues.apache.org/jira/browse/FLINK-15333
 


> SQL Group Windows do not work on Flink 1.11 
> --------------------------------------------
>
>                 Key: FLINK-21962
>                 URL: https://issues.apache.org/jira/browse/FLINK-21962
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.11.3
>            Reporter: Barak Ben-Nathan
>            Priority: Major
>
> I am running this on Flink ver.1.11.3: 
>  
> {code:java}
>     val bsEnv: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>     val bsSettings: EnvironmentSettings   = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv: StreamTableEnvironment = 
> StreamTableEnvironment.create(bsEnv, bsSettings)
>     val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
>     bsTableEnv.registerCatalog("custom_catalog", cat)
>     bsTableEnv.useCatalog("custom_catalog")
>     bsTableEnv.useDatabase("customDB")
>     val createInputTableDDL =
>       """
>         |
>         |CREATE TABLE kafkaTable (
>         | user_id BIGINT,
>         | item_id BIGINT,
>         | category_id BIGINT,
>         | behavior STRING,
>         | timestamp1 TIMESTAMP(3),
>         | WATERMARK FOR timestamp1 AS timestamp1
>         |) WITH (
>         | 'connector' = 'kafka',
>         | 'topic' = 'test-in',
>         | 'properties.bootstrap.servers' = 'localhost:9092',
>         | 'properties.group.id' = 'testGroup',
>         | 'format' = 'json',
>         | 'scan.startup.mode' = 'earliest-offset'
>         |)
>         |""".stripMargin
>     val createOutputTableDDL =
>       """
>         |CREATE TABLE kafkaOutTable (
>         | strt TIMESTAMP(3),
>         | sum_ijh BIGINT
>         |) WITH (
>         | 'connector' = 'kafka',
>         | 'topic' = 'test-out',
>         | 'properties.bootstrap.servers' = 'localhost:9092',
>         | 'properties.group.id' = 'testGroup',
>         | 'format' = 'json',
>         | 'scan.startup.mode' = 'earliest-offset'
>         |)
>         |""".stripMargin
>     bsTableEnv.executeSql(createInputTableDDL)
>     bsTableEnv.executeSql(createOutputTableDDL)
>     val result = bsTableEnv.sqlQuery(
>       "SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' 
> MINUTE) as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
>         "GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
>     )
>     result.executeInsert("kafkaOutTable")
> {code}
> When setting log level to TRACE, the job fails with this exception:
> {code:java}
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
> org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread 
> "main" java.lang.ClassCastException: 
> org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to 
> org.apache.flink.table.planner.plan.cost.FlinkCostFactory at 
> org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41)
>  at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown 
> Source) at 
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown 
> Source) at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) 
> at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) 
> at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) 
> at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) 
> at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40)
>  at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown 
> Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown 
> Source) at 
> org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249)
>  at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at 
> org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
>  at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
>  at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}
> It might be related to this:
>  https://issues.apache.org/jira/browse/FLINK-15333
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to