Hi Deng Ziqi & Lin Wanni & Guo Yuanfang,

First of all, I wanted to let you know that I think the ticket that you've
created is one of the most extensive and complete tickets I've seen. Thank
you very much for the effort on this!

Based on your input I think it indeed looks like this should be addressed.
Perhaps there are other maintainers who are more familiar with this code
who can give a more in-depth answer.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82


On Fri, 4 Mar 2022 at 08:00, 邓子琦 <dzq3210...@gmail.com> wrote:

> I have created an issue on jira
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-26334
> issue
>
>         Hello!
>
>         When we were studying the flink source code, we found that there
> was a problem with its algorithm for calculating the window start time.
> When *timestamp - offset + windowSize < 0* , the element will be
> incorrectly allocated to a window with a WindowSize larger than its own
> timestamp.
>
>         The problem is in
> *org.apache.flink.streaming.api.windowing.windows.TimeWindow*
>
> public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
>     return timestamp - (timestamp - offset + windowSize) % windowSize;
> }
>
>         We believe that this violates the constraints between time and
> window. *That is, an element should fall within a window whose start time
> is less than its own timestamp and whose end time is greater than its own
> timestamp.* However, the current situation is when *timestamp - offset +
> windowSize < 0*, *the element falls into a future time window.*
>
>        *You can reproduce the bug with the code at the end of the post.*
> Solution
>
>         In fact, the original algorithm is no problem in python, the key to
> this problem is the processing of the remainder operation by the
> programming language.
>
>         We finally think that it should be modified to the following
> algorithm.
>
> public static long getWindowStartWithOffset(long timestamp, long
> offset, long windowSize) {
>     return timestamp
>             - (timestamp - offset) % windowSize
>             - (windowSize & (timestamp - offset) >> 63);
> }
>
>         *windowSize & (timestamp - offset) >> 63* The function of this
> formula is to subtract windowSize from the overall operation result
> when *timestamp
> - offset<0*, otherwise do nothing. This way we can handle both positive and
> negative timestamps.
>
>         Finally, the element can be assigned to the correct window.
>
>         This code can pass current unit tests.
> getWindowStartWithOffset methods in other packages
>
>         I think that there should be many places in
> *getWindowStartWithOffset*. We searched for this method in the project and
> found that the problem of negative timestamps is handled in *flink.table.*
>
>         Below is their source code.
>
>
> *org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping*
>
> private long getWindowStartWithOffset(long timestamp, long offset,
> long windowSize) {
>     long remainder = (timestamp - offset) % windowSize;
>     // handle both positive and negative cases    if (remainder < 0) {
>         return timestamp - (remainder + windowSize);
>     } else {
>         return timestamp - remainder;
>     }
> }
>
> Can we make a pull request?
>
>         If the community deems it necessary to revise it, hopefully this
> task can be handed over to us. Our members are all students who have just
> graduated from school, and it is a great encouragement for us to contribute
> code to flink.
>
>         Thank you so much!
>
>         From Deng Ziqi & Lin Wanni & Guo Yuanfang
>
>
>  ===========================================
> reproduce
>
> /* output
> WindowStart: -15000    ExactSize:1    (a,-17000)
> WindowStart: -10000    ExactSize:1    (b,-12000)
> WindowStart: -5000 ExactSize:2    (c,-7000)
> WindowStart: -5000 ExactSize:2    (d,-2000)
> WindowStart: 0 ExactSize:1    (e,3000)
> WindowStart: 5000  ExactSize:1    (f,8000)
> WindowStart: 10000 ExactSize:1    (g,13000)
> WindowStart: 15000 ExactSize:1    (h,18000)
>  */public class Example {
>     public static void main(String[] args) throws Exception {
>
>         final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
>         TimeZone.setDefault(timeZone);
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env
>                 .setParallelism(1)
>                 .fromElements(
>                         Tuple2.of("a",-17*1000L),
>                         Tuple2.of("b",-12*1000L),
>                         Tuple2.of("c",-7*1000L),
>                         Tuple2.of("d",-2*1000L),
>                         Tuple2.of("e",3*1000L),
>                         Tuple2.of("f",8*1000L),
>                         Tuple2.of("g",13*1000L),
>                         Tuple2.of("h",18*1000L)
>                 )
>                 .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
>                                 .withTimestampAssigner(
>                                         new
> SerializableTimestampAssigner<Tuple2<String, Long>>() {
>                                             @Override
>                                             public long
> extractTimestamp(Tuple2<String, Long> element, long l) {
>                                                 return element.f1;
>                                             }
>                                         }
>                                 )
>                 )
>                 .keyBy(r->1)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>                 .process(
>                         new ProcessWindowFunction<Tuple2<String,
> Long>, String, Integer, TimeWindow>() {
>                             @Override
>                             public void process(Integer integer,
> ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> Collector<String> out) throws Exception {
>                                 for (Tuple2<String, Long> element :
> elements) {
>                                     out.collect("WindowStart:
> "+context.window().getStart()
>                                             + "\tExactSize:" +
> elements.spliterator().getExactSizeIfKnown()+"\t"
>                                             + element
>                                     );
>                                 }
>                             }
>                         }
>                 )
>                 .print();
>         env.execute();
>     }
> }
>

Reply via email to