[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153205#comment-17153205
 ] 

mzz commented on FLINK-18511:
-----------------------------

THX [~jark].
I use DDL to register the table, createSQL:
{code:java}
val createTableSql =
    """
      |CREATE TABLE aggs_test(
      | data_date VARCHAR,
      | data_min VARCHAR,
      | data_hour VARCHAR,
      | server_timestamp TIMESTAMP(3),
      | access VARCHAR
      | WATERMARK FOR server_timestamp AS server_timestamp - INTERVAL '5' SECOND
      |)
      |WITH(
      | 'connector.type' ='kafka',
      | 'connector.version' = 'universal',
      | 'connector.topic' = 'xxx',
      | 'connector.startup-mode' = 'earliest-offset',
      | 'connector.properties.0.key' = 'bootstrap.servers',
      | 'connector.properties.0.value' = 'xxx',
      | 'update-mode' = 'append',
      |  'format.type' = 'json',
      |  'format.derive-schema' = 'true'
      |)
    """.stripMargin
{code}

groupBySql:

{code:java}
val groupSql =
    """
      |select access,
      |TUMBLE_START(server_timestamp,INTERVAL '10' second),
      |count(ad_id)
      |from aggs_test
      |group by
      |TUMBLE(server_timestamp ,INTERVAL '10' second),access
    """.stripMargin
{code}

error:

{code:java}
WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error 
while canceling task.
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
        at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
        at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
        at java.lang.Thread.run(Thread.java:748)
{code}




> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> ------------------------------------------------------------------
>
>                 Key: FLINK-18511
>                 URL: https://issues.apache.org/jira/browse/FLINK-18511
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.10.0
>            Reporter: mzz
>            Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
>         .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
>     """
>       |select access,
>       |count(access),
>       |TUMBLE_END(rowtimes,INTERVAL '10' second)
>       |from log_test_table
>       |group by
>       |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
>     """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>       at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>       at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>       at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>       at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
>       at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127)
>       at com.km.flinkApp.AggsParser$.main(AggsParser.scala:50)
>       at com.km.flinkApp.AggsParser.main(AggsParser.scala)
> {code}
>  And website link:
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html|https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html]
> How should a type be defined on Schem,THX!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to