[ 
https://issues.apache.org/jira/browse/FLINK-17317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bianhaihua updated FLINK-17317:
-------------------------------
    Description: 
part of pom:
{quote}
h4. <dependency>

<groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-json</artifactId>
 <version>1.10.0</version>
 </dependency> <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
{quote}
 part of code:
{quote}Properties kafkaProps = (Properties) 
KafkaConfig.getInstance().getKafkaProps().clone();
 kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
this.getClass().getSimpleName());

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


 EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

ConnectorDescriptor kafkaConn = new Kafka().version("universal")
 .topic(this.topic)
 .startFromEarliest()
 .properties(kafkaProps);


 String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
 jsonSchema = "{\"type\":\"object\","
 + "\"properties\":"
 + "\{\"PONRXPower\":{\"type\":\"integer\"},"
 + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
 + "\"deviceId\":\{\"type\":\"string\"}}}";
 FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
 .jsonSchema(jsonSchema);
 Schema tableSchema = new Schema()
 .field("actualTime", DataTypes.TIMESTAMP(3))
 .rowtime(new 
Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
 .field("deviceId", DataTypes.STRING())
 .field("PONRXPower", DataTypes.BIGINT());


 bsTableEnv.connect(kafkaConn)
 .withFormat(jsonFormat)
 .withSchema(tableSchema)
 .inAppendMode()
 .createTemporaryTable("rxpower_detail");


 Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, INTERVAL 
'5' second) as win_end,"
 + " deviceId, count(deviceId) as lc from rxpower_detail "
 + " where PONRXPower< " + LOW_OPTICAL_POWER
 + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
 DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = 
bsTableEnv.toAppendStream(table2,
 TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
 }));


 resultStream.print();

try {
 bsTableEnv.execute("table api test");
 } catch (Exception e) {
 logger.error(e.getMessage(), e);
 }
{quote}
 

excetpions:
{quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException: 
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered.Exception in thread "main" 
org.apache.flink.table.api.TableException: Window aggregate can only be defined 
over a time attribute column, but TIMESTAMP(3) encountered. at 
org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
 at 
org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) 
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
 at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) 
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
{quote}
 

 

 

 

 

 

  was:
part of pom:
{quote}
h4. <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.10.0</version>
</dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.10.0</version>
</dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-json</artifactId>
 <version>1.10.0</version>
 </dependency> <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
{quote}
 part of code:
{quote}{{StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);}}

{{EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();}}
{{StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);}}

{{ConnectorDescriptor kafkaConn = new Kafka().version("universal")}}
{{ .topic(this.topic)}}
{{ .startFromEarliest()}}
{{ .properties(kafkaProps);}}

{{kafkaConn.toProperties().forEach((k, v) -> logger.info("'{}'='{}'", k, v));}}

{{String jsonSchema = }}{{"{\"type\":\"object\","}}
{{ + "\"properties\":"}}
{{ + "\{\"PONRXPower\":{\"type\":\"integer\"},"}}
{{ + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"}}
{{ + "\"deviceId\":\{\"type\":\"string\"}}}";}}
{{ FormatDescriptor jsonFormat = new Json().failOnMissingField(false)}}
{{ .jsonSchema(jsonSchema);}}
{{ Schema tableSchema = new Schema()}}
{{ .field("actualTime", DataTypes.TIMESTAMP(3))}}
{{ .rowtime(new 
Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())}}
{{ .field("deviceId", DataTypes.STRING())}}
{{ .field("PONRXPower", DataTypes.BIGINT());}}

{{bsTableEnv.connect(kafkaConn)}}
{{ .withFormat(jsonFormat)}}
{{ .withSchema(tableSchema)}}
{{ .inAppendMode()}}
{{ .createTemporaryTable("rxpower_detail");}}

{{Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, 
INTERVAL '5' second) as win_end,"}}
{{ + " deviceId, count(deviceId) as lc from rxpower_detail "}}
{{ + " where PONRXPower< " + LOW_OPTICAL_POWER}}
{{ + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");}}


{{ DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = 
bsTableEnv.toAppendStream(table2,}}
{{ TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() 
{}}
{{ }));}}

{{resultStream.print();}}

{{try {}}
{{ bsTableEnv.execute("table api test");}}
{{ } catch (Exception e) {}}
{{ logger.error(e.getMessage(), e);}}
{{ }}}
{quote}
 

excetpions:
{quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException: 
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered.Exception in thread "main" 
org.apache.flink.table.api.TableException: Window aggregate can only be defined 
over a time attribute column, but TIMESTAMP(3) encountered. at 
org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
 at 
org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) 
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
 at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) 
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
{quote}
 

 

 

 

 

 


> Schema.rowtime() method not working correctly, throws  'Window aggregate can 
> only be defined over a time attribute column' exception
> ------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17317
>                 URL: https://issues.apache.org/jira/browse/FLINK-17317
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.10.0
>            Reporter: bianhaihua
>            Priority: Major
>
> part of pom:
> {quote}
> h4. <dependency>
> <groupId>org.apache.flink</groupId>
>  <artifactId>flink-java</artifactId>
>  <version>1.10.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-streaming-java_2.12</artifactId>
>  <version>1.10.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-connector-kafka_2.12</artifactId>
>  <version>1.10.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-json</artifactId>
>  <version>1.10.0</version>
>  </dependency> <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-table-planner-blink_2.12</artifactId>
>  <version>1.10.0</version>
>  </dependency>
> {quote}
>  part of code:
> {quote}Properties kafkaProps = (Properties) 
> KafkaConfig.getInstance().getKafkaProps().clone();
>  kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
> this.getClass().getSimpleName());
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>  EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
> bsSettings);
> ConnectorDescriptor kafkaConn = new Kafka().version("universal")
>  .topic(this.topic)
>  .startFromEarliest()
>  .properties(kafkaProps);
>  String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
>  jsonSchema = "{\"type\":\"object\","
>  + "\"properties\":"
>  + "\{\"PONRXPower\":{\"type\":\"integer\"},"
>  + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
>  + "\"deviceId\":\{\"type\":\"string\"}}}";
>  FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
>  .jsonSchema(jsonSchema);
>  Schema tableSchema = new Schema()
>  .field("actualTime", DataTypes.TIMESTAMP(3))
>  .rowtime(new 
> Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
>  .field("deviceId", DataTypes.STRING())
>  .field("PONRXPower", DataTypes.BIGINT());
>  bsTableEnv.connect(kafkaConn)
>  .withFormat(jsonFormat)
>  .withSchema(tableSchema)
>  .inAppendMode()
>  .createTemporaryTable("rxpower_detail");
>  Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, 
> INTERVAL '5' second) as win_end,"
>  + " deviceId, count(deviceId) as lc from rxpower_detail "
>  + " where PONRXPower< " + LOW_OPTICAL_POWER
>  + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
>  DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = 
> bsTableEnv.toAppendStream(table2,
>  TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
>  }));
>  resultStream.print();
> try {
>  bsTableEnv.execute("table api test");
>  } catch (Exception e) {
>  logger.error(e.getMessage(), e);
>  }
> {quote}
>  
> excetpions:
> {quote}{{^Exception in thread "main" 
> org.apache.flink.table.api.TableException: Window aggregate can only be 
> defined over a time attribute column, but TIMESTAMP(3) encountered.Exception 
> in thread "main" org.apache.flink.table.api.TableException: Window aggregate 
> can only be defined over a time attribute column, but TIMESTAMP(3) 
> encountered. at 
> org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
>  at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>  at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>  at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>  at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>  at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>  at scala.collection.Iterator.foreach(Iterator.scala:937) at 
> scala.collection.Iterator.foreach$(Iterator.scala:937) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
> {quote}
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to