[ 
https://issues.apache.org/jira/browse/FLINK-17317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bianhaihua updated FLINK-17317:
-------------------------------
    Description: 
part of pom:
{quote}
h4. <dependency>

<groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-json</artifactId>
 <version>1.10.0</version>
 </dependency> <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
{quote}
 part of code:
{quote}Properties kafkaProps = (Properties) 
KafkaConfig.getInstance().getKafkaProps().clone();
 kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
this.getClass().getSimpleName());

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

ConnectorDescriptor kafkaConn = new Kafka().version("universal")
 .topic(this.topic)
 .startFromEarliest()
 .properties(kafkaProps);

String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
 jsonSchema = "{\"type\":\"object\","
 + "\"properties\":"
 + "\{\"PONRXPower\":{\"type\":\"string\"},"
 + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
 + "\"deviceId\":\{\"type\":\"string\"}}}";
 FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
 .jsonSchema(jsonSchema);
 Schema tableSchema = new Schema()
 .field("actualTime", DataTypes.TIMESTAMP(3))
 .rowtime(new 
Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
 .field("deviceId", DataTypes.STRING())
 .field("PONRXPower", DataTypes.BIGINT());

bsTableEnv.connect(kafkaConn)
 .withFormat(jsonFormat)
 .withSchema(tableSchema)
 .inAppendMode()
 .createTemporaryTable("rxpower_detail");

Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, INTERVAL 
'5' second) as win_end,"
 + " deviceId, count(deviceId) as lc from rxpower_detail "
 + " where PONRXPower< " + LOW_OPTICAL_POWER
 + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
 DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = 
bsTableEnv.toAppendStream(table2,
 TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
 }));

resultStream.print();

try
 Unknown macro: \{ bsTableEnv.execute("table api test"); }
 catch (Exception e)
 Unknown macro: \{ logger.error(e.getMessage(), e); }
{quote}
 

excetpions:
{quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException: 
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered.Exception in thread "main" 
org.apache.flink.table.api.TableException: Window aggregate can only be defined 
over a time attribute column, but TIMESTAMP(3) encountered. at 
org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
 at 
org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) 
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
 at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) 
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
{quote}
 

 

 

 

 

 

  was:
part of pom:
{quote}
h4. <dependency>

<groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-json</artifactId>
 <version>1.10.0</version>
 </dependency> <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.10.0</version>
 </dependency>
{quote}
 part of code:
{quote}Properties kafkaProps = (Properties) 
KafkaConfig.getInstance().getKafkaProps().clone();
 kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
this.getClass().getSimpleName());

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

ConnectorDescriptor kafkaConn = new Kafka().version("universal")
 .topic(this.topic)
 .startFromEarliest()
 .properties(kafkaProps);

String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
 jsonSchema = "{\"type\":\"object\","
 + "\"properties\":"
 + "\{\"PONRXPower\":"+"{\"type\":\"integer\"},"
 + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
 + "\"deviceId\":\{\"type\":\"string\"}}}";
 FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
 .jsonSchema(jsonSchema);
 Schema tableSchema = new Schema()
 .field("actualTime", DataTypes.TIMESTAMP(3))
 .rowtime(new 
Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
 .field("deviceId", DataTypes.STRING())
 .field("PONRXPower", DataTypes.BIGINT());

bsTableEnv.connect(kafkaConn)
 .withFormat(jsonFormat)
 .withSchema(tableSchema)
 .inAppendMode()
 .createTemporaryTable("rxpower_detail");

Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, INTERVAL 
'5' second) as win_end,"
 + " deviceId, count(deviceId) as lc from rxpower_detail "
 + " where PONRXPower< " + LOW_OPTICAL_POWER
 + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
 DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = 
bsTableEnv.toAppendStream(table2,
 TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
 }));

resultStream.print();

try
 Unknown macro: \{ bsTableEnv.execute("table api test"); }
 catch (Exception e)
 Unknown macro: \{ logger.error(e.getMessage(), e); }
{quote}
 

excetpions:
{quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException: 
Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered.Exception in thread "main" 
org.apache.flink.table.api.TableException: Window aggregate can only be defined 
over a time attribute column, but TIMESTAMP(3) encountered. at 
org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
 at 
org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) 
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
 at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) 
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
 at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) 
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
 at scala.collection.Iterator.foreach(Iterator.scala:937) at 
scala.collection.Iterator.foreach$(Iterator.scala:937) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
 at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
 at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
{quote}
 

 

 

 

 

 


> Schema.rowtime() method not working correctly, throws  'Window aggregate can 
> only be defined over a time attribute column' exception
> ------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17317
>                 URL: https://issues.apache.org/jira/browse/FLINK-17317
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.10.0
>            Reporter: bianhaihua
>            Priority: Major
>
> part of pom:
> {quote}
> h4. <dependency>
> <groupId>org.apache.flink</groupId>
>  <artifactId>flink-java</artifactId>
>  <version>1.10.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-streaming-java_2.12</artifactId>
>  <version>1.10.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-connector-kafka_2.12</artifactId>
>  <version>1.10.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-json</artifactId>
>  <version>1.10.0</version>
>  </dependency> <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-table-planner-blink_2.12</artifactId>
>  <version>1.10.0</version>
>  </dependency>
> {quote}
>  part of code:
> {quote}Properties kafkaProps = (Properties) 
> KafkaConfig.getInstance().getKafkaProps().clone();
>  kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
> this.getClass().getSimpleName());
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
> bsSettings);
> ConnectorDescriptor kafkaConn = new Kafka().version("universal")
>  .topic(this.topic)
>  .startFromEarliest()
>  .properties(kafkaProps);
> String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
>  jsonSchema = "{\"type\":\"object\","
>  + "\"properties\":"
>  + "\{\"PONRXPower\":{\"type\":\"string\"},"
>  + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
>  + "\"deviceId\":\{\"type\":\"string\"}}}";
>  FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
>  .jsonSchema(jsonSchema);
>  Schema tableSchema = new Schema()
>  .field("actualTime", DataTypes.TIMESTAMP(3))
>  .rowtime(new 
> Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
>  .field("deviceId", DataTypes.STRING())
>  .field("PONRXPower", DataTypes.BIGINT());
> bsTableEnv.connect(kafkaConn)
>  .withFormat(jsonFormat)
>  .withSchema(tableSchema)
>  .inAppendMode()
>  .createTemporaryTable("rxpower_detail");
> Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, 
> INTERVAL '5' second) as win_end,"
>  + " deviceId, count(deviceId) as lc from rxpower_detail "
>  + " where PONRXPower< " + LOW_OPTICAL_POWER
>  + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
>  DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = 
> bsTableEnv.toAppendStream(table2,
>  TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
>  }));
> resultStream.print();
> try
>  Unknown macro: \{ bsTableEnv.execute("table api test"); }
>  catch (Exception e)
>  Unknown macro: \{ logger.error(e.getMessage(), e); }
> {quote}
>  
> excetpions:
> {quote}{{^Exception in thread "main" 
> org.apache.flink.table.api.TableException: Window aggregate can only be 
> defined over a time attribute column, but TIMESTAMP(3) encountered.Exception 
> in thread "main" org.apache.flink.table.api.TableException: Window aggregate 
> can only be defined over a time attribute column, but TIMESTAMP(3) 
> encountered. at 
> org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
>  at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>  at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>  at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>  at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>  at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>  at scala.collection.Iterator.foreach(Iterator.scala:937) at 
> scala.collection.Iterator.foreach$(Iterator.scala:937) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>  at 
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
> {quote}
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to