[ https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
realdengziqi updated FLINK-26334: --------------------------------- Affects Version/s: 1.15.0 > when the (timestamp - offset + windowSize) is less than 0 the calculation > result of TimeWindow.getWindowSTartWithOffset is incorrect > ------------------------------------------------------------------------------------------------------------------------------------ > > Key: FLINK-26334 > URL: https://issues.apache.org/jira/browse/FLINK-26334 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.15.0, 1.14.3 > Environment: flink version 1.14.3 > Reporter: realdengziqi > Priority: Major > Original Estimate: 16h > Remaining Estimate: 16h > > > source code > {code:java} > //Method to get the window start for a timestamp. > //Params: > //timestamp – epoch millisecond to get the window start. > //offset – The offset which window start would be shifted by. > //windowSize – The size of the generated windows. > //Returns: > //window start > public static long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > return timestamp - (timestamp - offset + windowSize) % windowSize; > } {code} > If windowSize is 6 seconds, an element with a timestamp of -7000L should be > assigned to a window with a start time of -12000L. But this code will assign > it to the window whose start time is -6000L. > According to the current calculation method, when the timestamp is (timestamp > - offset + windowSize) is less than 0, the start time of the calculated time > window will be offset by one windowsSide unit in the direction of 0. > I had a discussion with a friend and thought it was because the current > calculation logic is rounding towards 0. We should make it round to -∞. > Do you think this is a bug. We would like to submit a pull request on github > to fix it. > Below is a sample program for a scrolling window. > {code:java} > public class Test01 { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env > .fromElements( > Tuple2.of("a",-7L*1000L), // start time should be > -12s > Tuple2.of("b",-1L*1000L), > Tuple2.of("c",1L*1000L), > Tuple2.of("d",7L*1000L) > ) > .assignTimestampsAndWatermarks( > > WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps() > .withTimestampAssigner( > new > SerializableTimestampAssigner<Tuple2<String, Long>>() { > @Override > public long > extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) { > return element.f1; > } > } > ) > ) > .keyBy(r->1) > .window(TumblingEventTimeWindows.of(Time.seconds(6))) > .process( > new ProcessWindowFunction<Tuple2<String, Long>, > String, Integer, TimeWindow>() { > @Override > public void process(Integer integer, > ProcessWindowFunction<Tuple2<String, Long>, String, Integer, > TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, > Collector<String> out) throws Exception { > for (Tuple2<String, Long> element : elements) > { > JSONObject item = new JSONObject(); > item.put("data",element.toString()); > item.put("windowStartTime",new > Timestamp(context.window().getStart()).toString() ); > item.put("windowEndTime",new > Timestamp(context.window().getEnd()).toString() ); > out.collect(item.toJSONString()); > } > } > } > ) > .print(); > env.execute(); > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)