Hi Deng Ziqi & Lin Wanni & Guo Yuanfang, First of all, I wanted to let you know that I think the ticket that you've created is one of the most extensive and complete tickets I've seen. Thank you very much for the effort on this!
Based on your input I think it indeed looks like this should be addressed. Perhaps there are other maintainers who are more familiar with this code who can give a more in-depth answer. Best regards, Martijn Visser https://twitter.com/MartijnVisser82 On Fri, 4 Mar 2022 at 08:00, 邓子琦 <dzq3210...@gmail.com> wrote: > I have created an issue on jira > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334 > issue > > Hello! > > When we were studying the flink source code, we found that there > was a problem with its algorithm for calculating the window start time. > When *timestamp - offset + windowSize < 0* , the element will be > incorrectly allocated to a window with a WindowSize larger than its own > timestamp. > > The problem is in > *org.apache.flink.streaming.api.windowing.windows.TimeWindow* > > public static long getWindowStartWithOffset(long timestamp, long > offset, long windowSize) { > return timestamp - (timestamp - offset + windowSize) % windowSize; > } > > We believe that this violates the constraints between time and > window. *That is, an element should fall within a window whose start time > is less than its own timestamp and whose end time is greater than its own > timestamp.* However, the current situation is when *timestamp - offset + > windowSize < 0*, *the element falls into a future time window.* > > *You can reproduce the bug with the code at the end of the post.* > Solution > > In fact, the original algorithm is no problem in python, the key to > this problem is the processing of the remainder operation by the > programming language. > > We finally think that it should be modified to the following > algorithm. > > public static long getWindowStartWithOffset(long timestamp, long > offset, long windowSize) { > return timestamp > - (timestamp - offset) % windowSize > - (windowSize & (timestamp - offset) >> 63); > } > > *windowSize & (timestamp - offset) >> 63* The function of this > formula is to subtract windowSize from the overall operation result > when *timestamp > - offset<0*, otherwise do nothing. This way we can handle both positive and > negative timestamps. > > Finally, the element can be assigned to the correct window. > > This code can pass current unit tests. > getWindowStartWithOffset methods in other packages > > I think that there should be many places in > *getWindowStartWithOffset*. We searched for this method in the project and > found that the problem of negative timestamps is handled in *flink.table.* > > Below is their source code. > > > *org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping* > > private long getWindowStartWithOffset(long timestamp, long offset, > long windowSize) { > long remainder = (timestamp - offset) % windowSize; > // handle both positive and negative cases if (remainder < 0) { > return timestamp - (remainder + windowSize); > } else { > return timestamp - remainder; > } > } > > Can we make a pull request? > > If the community deems it necessary to revise it, hopefully this > task can be handed over to us. Our members are all students who have just > graduated from school, and it is a great encouragement for us to contribute > code to flink. > > Thank you so much! > > From Deng Ziqi & Lin Wanni & Guo Yuanfang > > > =========================================== > reproduce > > /* output > WindowStart: -15000 ExactSize:1 (a,-17000) > WindowStart: -10000 ExactSize:1 (b,-12000) > WindowStart: -5000 ExactSize:2 (c,-7000) > WindowStart: -5000 ExactSize:2 (d,-2000) > WindowStart: 0 ExactSize:1 (e,3000) > WindowStart: 5000 ExactSize:1 (f,8000) > WindowStart: 10000 ExactSize:1 (g,13000) > WindowStart: 15000 ExactSize:1 (h,18000) > */public class Example { > public static void main(String[] args) throws Exception { > > final TimeZone timeZone = TimeZone.getTimeZone("GTM+0"); > TimeZone.setDefault(timeZone); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .setParallelism(1) > .fromElements( > Tuple2.of("a",-17*1000L), > Tuple2.of("b",-12*1000L), > Tuple2.of("c",-7*1000L), > Tuple2.of("d",-2*1000L), > Tuple2.of("e",3*1000L), > Tuple2.of("f",8*1000L), > Tuple2.of("g",13*1000L), > Tuple2.of("h",18*1000L) > ) > .assignTimestampsAndWatermarks( > > WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps() > .withTimestampAssigner( > new > SerializableTimestampAssigner<Tuple2<String, Long>>() { > @Override > public long > extractTimestamp(Tuple2<String, Long> element, long l) { > return element.f1; > } > } > ) > ) > .keyBy(r->1) > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .process( > new ProcessWindowFunction<Tuple2<String, > Long>, String, Integer, TimeWindow>() { > @Override > public void process(Integer integer, > ProcessWindowFunction<Tuple2<String, Long>, String, Integer, > TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, > Collector<String> out) throws Exception { > for (Tuple2<String, Long> element : > elements) { > out.collect("WindowStart: > "+context.window().getStart() > + "\tExactSize:" + > elements.spliterator().getExactSizeIfKnown()+"\t" > + element > ); > } > } > } > ) > .print(); > env.execute(); > } > } >