[
https://issues.apache.org/jira/browse/FLINK-17317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
bianhaihua updated FLINK-17317:
-------------------------------
Description:
part of pom:
{quote}
h4. <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.10.0</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.0</version>
</dependency>
{quote}
part of code:
{quote}Properties kafkaProps = (Properties)
KafkaConfig.getInstance().getKafkaProps().clone();
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
this.getClass().getSimpleName());
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
ConnectorDescriptor kafkaConn = new Kafka().version("universal")
.topic(this.topic)
.startFromEarliest()
.properties(kafkaProps);
String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
jsonSchema = "{\"type\":\"object\","
+ "\"properties\":"
+ "\{\"PONRXPower\":{\"type\":\"integer\"},"
+ "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
+ "\"deviceId\":\{\"type\":\"string\"}}}";
FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
.jsonSchema(jsonSchema);
Schema tableSchema = new Schema()
.field("actualTime", DataTypes.TIMESTAMP(3))
.rowtime(new
Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
.field("deviceId", DataTypes.STRING())
.field("PONRXPower", DataTypes.BIGINT());
bsTableEnv.connect(kafkaConn)
.withFormat(jsonFormat)
.withSchema(tableSchema)
.inAppendMode()
.createTemporaryTable("rxpower_detail");
Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, INTERVAL
'5' second) as win_end,"
+ " deviceId, count(deviceId) as lc from rxpower_detail "
+ " where PONRXPower< " + LOW_OPTICAL_POWER
+ " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream =
bsTableEnv.toAppendStream(table2,
TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
}));
resultStream.print();
try {
bsTableEnv.execute("table api test");
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
{quote}
excetpions:
{quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException:
Window aggregate can only be defined over a time attribute column, but
TIMESTAMP(3) encountered.Exception in thread "main"
org.apache.flink.table.api.TableException: Window aggregate can only be defined
over a time attribute column, but TIMESTAMP(3) encountered. at
org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
at
org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937) at
scala.collection.Iterator.foreach$(Iterator.scala:937) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at
scala.collection.IterableLike.foreach(IterableLike.scala:70) at
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
{quote}
was:
part of pom:
{quote}
h4. <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.10.0</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.0</version>
</dependency>
{quote}
part of code:
{quote}{{StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);}}
{{EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();}}
{{StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);}}
{{ConnectorDescriptor kafkaConn = new Kafka().version("universal")}}
{{ .topic(this.topic)}}
{{ .startFromEarliest()}}
{{ .properties(kafkaProps);}}
{{kafkaConn.toProperties().forEach((k, v) -> logger.info("'{}'='{}'", k, v));}}
{{String jsonSchema = }}{{"{\"type\":\"object\","}}
{{ + "\"properties\":"}}
{{ + "\{\"PONRXPower\":{\"type\":\"integer\"},"}}
{{ + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"}}
{{ + "\"deviceId\":\{\"type\":\"string\"}}}";}}
{{ FormatDescriptor jsonFormat = new Json().failOnMissingField(false)}}
{{ .jsonSchema(jsonSchema);}}
{{ Schema tableSchema = new Schema()}}
{{ .field("actualTime", DataTypes.TIMESTAMP(3))}}
{{ .rowtime(new
Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())}}
{{ .field("deviceId", DataTypes.STRING())}}
{{ .field("PONRXPower", DataTypes.BIGINT());}}
{{bsTableEnv.connect(kafkaConn)}}
{{ .withFormat(jsonFormat)}}
{{ .withSchema(tableSchema)}}
{{ .inAppendMode()}}
{{ .createTemporaryTable("rxpower_detail");}}
{{Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime,
INTERVAL '5' second) as win_end,"}}
{{ + " deviceId, count(deviceId) as lc from rxpower_detail "}}
{{ + " where PONRXPower< " + LOW_OPTICAL_POWER}}
{{ + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");}}
{{ DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream =
bsTableEnv.toAppendStream(table2,}}
{{ TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>()
{}}
{{ }));}}
{{resultStream.print();}}
{{try {}}
{{ bsTableEnv.execute("table api test");}}
{{ } catch (Exception e) {}}
{{ logger.error(e.getMessage(), e);}}
{{ }}}
{quote}
excetpions:
{quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException:
Window aggregate can only be defined over a time attribute column, but
TIMESTAMP(3) encountered.Exception in thread "main"
org.apache.flink.table.api.TableException: Window aggregate can only be defined
over a time attribute column, but TIMESTAMP(3) encountered. at
org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
at
org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937) at
scala.collection.Iterator.foreach$(Iterator.scala:937) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at
scala.collection.IterableLike.foreach(IterableLike.scala:70) at
scala.collection.IterableLike.foreach$(IterableLike.scala:69) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
{quote}
> Schema.rowtime() method not working correctly, throws 'Window aggregate can
> only be defined over a time attribute column' exception
> ------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17317
> URL: https://issues.apache.org/jira/browse/FLINK-17317
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.10.0
> Reporter: bianhaihua
> Priority: Major
>
> part of pom:
> {quote}
> h4. <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>1.10.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.12</artifactId>
> <version>1.10.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka_2.12</artifactId>
> <version>1.10.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-json</artifactId>
> <version>1.10.0</version>
> </dependency> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner-blink_2.12</artifactId>
> <version>1.10.0</version>
> </dependency>
> {quote}
> part of code:
> {quote}Properties kafkaProps = (Properties)
> KafkaConfig.getInstance().getKafkaProps().clone();
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG,
> this.getClass().getSimpleName());
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
> ConnectorDescriptor kafkaConn = new Kafka().version("universal")
> .topic(this.topic)
> .startFromEarliest()
> .properties(kafkaProps);
> String jsonSchema = JsonUtils.getJsonSchema(SimplePeriodicPojo.class);
> jsonSchema = "{\"type\":\"object\","
> + "\"properties\":"
> + "\{\"PONRXPower\":{\"type\":\"integer\"},"
> + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"
> + "\"deviceId\":\{\"type\":\"string\"}}}";
> FormatDescriptor jsonFormat = new Json().failOnMissingField(false)
> .jsonSchema(jsonSchema);
> Schema tableSchema = new Schema()
> .field("actualTime", DataTypes.TIMESTAMP(3))
> .rowtime(new
> Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())
> .field("deviceId", DataTypes.STRING())
> .field("PONRXPower", DataTypes.BIGINT());
> bsTableEnv.connect(kafkaConn)
> .withFormat(jsonFormat)
> .withSchema(tableSchema)
> .inAppendMode()
> .createTemporaryTable("rxpower_detail");
> Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime,
> INTERVAL '5' second) as win_end,"
> + " deviceId, count(deviceId) as lc from rxpower_detail "
> + " where PONRXPower< " + LOW_OPTICAL_POWER
> + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");
> DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream =
> bsTableEnv.toAppendStream(table2,
> TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {
> }));
> resultStream.print();
> try {
> bsTableEnv.execute("table api test");
> } catch (Exception e) {
> logger.error(e.getMessage(), e);
> }
> {quote}
>
> excetpions:
> {quote}{{^Exception in thread "main"
> org.apache.flink.table.api.TableException: Window aggregate can only be
> defined over a time attribute column, but TIMESTAMP(3) encountered.Exception
> in thread "main" org.apache.flink.table.api.TableException: Window aggregate
> can only be defined over a time attribute column, but TIMESTAMP(3)
> encountered. at
> org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51)
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937) at
> scala.collection.Iterator.foreach$(Iterator.scala:937) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:69) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}}
> {quote}
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)