[ https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501490#comment-17501490 ]
realdengziqi commented on FLINK-26334: -------------------------------------- [~Terry1897] Thank you, we will continue to work hard. > When timestamp - offset + windowSize < 0, elements cannot be assigned to the > correct window > ------------------------------------------------------------------------------------------- > > Key: FLINK-26334 > URL: https://issues.apache.org/jira/browse/FLINK-26334 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.15.0, 1.14.3 > Environment: flink version 1.14.3 > Reporter: realdengziqi > Assignee: realdengziqi > Priority: Critical > Labels: pull-request-available > Attachments: image-2022-03-04-11-28-26-616.png, > image-2022-03-04-11-37-10-035.png > > Original Estimate: 3h > Remaining Estimate: 3h > > h2. issue > Hello! > When we were studying the flink source code, we found that there was > a problem with its algorithm for calculating the window start time. When > _timestamp - offset + windowSize < 0_ , the element will be incorrectly > allocated to a window with a WindowSize larger than its own timestamp. > The problem is in > _org.apache.flink.streaming.api.windowing.windows.TimeWindow_ > {code:java} > public static long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > return timestamp - (timestamp - offset + windowSize) % windowSize; > } {code} > _!image-2022-03-04-11-28-26-616.png|width=710,height=251!_ > We believe that this violates the constraints between time and > window. *That is, an element should fall within a window whose start time is > less than its own timestamp and whose end time is greater than its own > timestamp.* However, the current situation is when {_}timestamp - offset + > windowSize < 0{_}, *the element falls into a future time window.* > *You can reproduce the bug with the code at the end of the post.* > h2. Solution > In fact, the original algorithm is no problem in python, the key to > this problem is the processing of the remainder operation by the programming > language. > We finally think that it should be modified to the following > algorithm. > {code:java} > public static long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > return timestamp > - (timestamp - offset) % windowSize > - (windowSize & (timestamp - offset) >> 63); > } {code} > _windowSize & (timestamp - offset) >> 63_ The function of this > formula is to subtract windowSize from the overall operation result when > {_}timestamp - offset<0{_}, otherwise do nothing. This way we can handle both > positive and negative timestamps. > Finally, the element can be assigned to the correct window. > !image-2022-03-04-11-37-10-035.png|width=712,height=284! > This code can pass current unit tests. > h2. getWindowStartWithOffset methods in other packages > I think that there should be many places in > {_}getWindowStartWithOffset{_}. We searched for this method in the project > and found that the problem of negative timestamps is handled in _flink.table._ > Below is their source code. > > _{{org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping}}_ > {code:java} > private long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > long remainder = (timestamp - offset) % windowSize; > // handle both positive and negative cases > if (remainder < 0) { > return timestamp - (remainder + windowSize); > } else { > return timestamp - remainder; > } > } {code} > h2. Can we make a pull request? > If the community deems it necessary to revise it, hopefully this task > can be handed over to us. Our members are all students who have just > graduated from school, and it is a great encouragement for us to contribute > code to flink. > Thank you so much! > From Deng Ziqi & Lin Wanni & Guo Yuanfang > > ---- > ---- > h2. reproduce > {code:java} > /* output > WindowStart: -15000 ExactSize:1 (a,-17000) > WindowStart: -10000 ExactSize:1 (b,-12000) > WindowStart: -5000 ExactSize:2 (c,-7000) > WindowStart: -5000 ExactSize:2 (d,-2000) > WindowStart: 0 ExactSize:1 (e,3000) > WindowStart: 5000 ExactSize:1 (f,8000) > WindowStart: 10000 ExactSize:1 (g,13000) > WindowStart: 15000 ExactSize:1 (h,18000) > */ > public class Example { > public static void main(String[] args) throws Exception { > final TimeZone timeZone = TimeZone.getTimeZone("GTM+0"); > TimeZone.setDefault(timeZone); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .setParallelism(1) > .fromElements( > Tuple2.of("a",-17*1000L), > Tuple2.of("b",-12*1000L), > Tuple2.of("c",-7*1000L), > Tuple2.of("d",-2*1000L), > Tuple2.of("e",3*1000L), > Tuple2.of("f",8*1000L), > Tuple2.of("g",13*1000L), > Tuple2.of("h",18*1000L) > ) > .assignTimestampsAndWatermarks( > > WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps() > .withTimestampAssigner( > new > SerializableTimestampAssigner<Tuple2<String, Long>>() { > @Override > public long > extractTimestamp(Tuple2<String, Long> element, long l) { > return element.f1; > } > } > ) > ) > .keyBy(r->1) > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .process( > new ProcessWindowFunction<Tuple2<String, Long>, > String, Integer, TimeWindow>() { > @Override > public void process(Integer integer, > ProcessWindowFunction<Tuple2<String, Long>, String, Integer, > TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, > Collector<String> out) throws Exception { > for (Tuple2<String, Long> element : elements) > { > out.collect("WindowStart: > "+context.window().getStart() > + "\tExactSize:" + > elements.spliterator().getExactSizeIfKnown()+"\t" > + element > ); > } > } > } > ) > .print(); > env.execute(); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)