[ https://issues.apache.org/jira/browse/FLINK-6862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark You closed FLINK-6862. --------------------------- Resolution: Duplicate > Tumble window rowtime not resolve at logic plan validation > ---------------------------------------------------------- > > Key: FLINK-6862 > URL: https://issues.apache.org/jira/browse/FLINK-6862 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.3.0 > Reporter: Mark You > > Following code sample work in version 1.2.1, but failed at 1.3.0 > {code:title= TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List<Content> data = new ArrayList<Content>(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > DataStream<Content> stream = env.fromCollection(data); > DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > Table table = tableEnv.fromDataStream(stream2); > > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start, > content.count"); > env.execute(); > } > public static class Content implements Serializable { > private long recordTime; > private String content; > public Content() { > super(); > } > public Content(long recordTime, String content) { > super(); > this.recordTime = recordTime; > this.content = content; > } > public long getRecordTime() { > return recordTime; > } > public void setRecordTime(long recordTime) { > this.recordTime = recordTime; > } > public String getContent() { > return content; > } > public void setContent(String content) { > this.content = content; > } > } > private class TimestampWithEqualWatermark implements > AssignerWithPunctuatedWatermarks<Object[]> { > /** > * > */ > private static final long serialVersionUID = 1L; > @Override > public long extractTimestamp(Object[] element, long > previousElementTimestamp) { > // TODO Auto-generated method stub > return (long) element[0]; > } > @Override > public Watermark checkAndGetNextWatermark(Object[] lastElement, long > extractedTimestamp) { > return new Watermark(extractedTimestamp); > } > } > } > {code} > Exception trace: > {noformat} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Cannot resolve [rowtime] given input [content, recordTime]. > at > org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143) > at > org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86) > at > org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83) > at > org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72) > at > org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119) > at > org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) > at scala.collection.AbstractIterator.to(Iterator.scala:1194) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1194) > at > org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137) > at > org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83) > at > org.apache.flink.table.plan.logical.Project.validate(operators.scala:67) > at > org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054) > at > org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073) > at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)