[
https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501258#comment-17501258
]
Terry Wang commented on FLINK-26334:
------------------------------------
Good insight about the description of problem and cause analysis.(y)
> When timestamp - offset + windowSize < 0, elements cannot be assigned to the
> correct window
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-26334
> URL: https://issues.apache.org/jira/browse/FLINK-26334
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.15.0, 1.14.3
> Environment: flink version 1.14.3
> Reporter: realdengziqi
> Assignee: realdengziqi
> Priority: Critical
> Attachments: image-2022-03-04-11-28-26-616.png,
> image-2022-03-04-11-37-10-035.png
>
> Original Estimate: 3h
> Remaining Estimate: 3h
>
> h2. issue
> Hello!
> When we were studying the flink source code, we found that there was
> a problem with its algorithm for calculating the window start time. When
> _timestamp - offset + windowSize < 0_ , the element will be incorrectly
> allocated to a window with a WindowSize larger than its own timestamp.
> The problem is in
> _org.apache.flink.streaming.api.windowing.windows.TimeWindow_
> {code:java}
> public static long getWindowStartWithOffset(long timestamp, long offset, long
> windowSize) {
> return timestamp - (timestamp - offset + windowSize) % windowSize;
> } {code}
> _!image-2022-03-04-11-28-26-616.png|width=710,height=251!_
> We believe that this violates the constraints between time and
> window. *That is, an element should fall within a window whose start time is
> less than its own timestamp and whose end time is greater than its own
> timestamp.* However, the current situation is when {_}timestamp - offset +
> windowSize < 0{_}, *the element falls into a future time window.*
> *You can reproduce the bug with the code at the end of the post.*
> h2. Solution
> In fact, the original algorithm is no problem in python, the key to
> this problem is the processing of the remainder operation by the programming
> language.
> We finally think that it should be modified to the following
> algorithm.
> {code:java}
> public static long getWindowStartWithOffset(long timestamp, long offset, long
> windowSize) {
> return timestamp
> - (timestamp - offset) % windowSize
> - (windowSize & (timestamp - offset) >> 63);
> } {code}
> _windowSize & (timestamp - offset) >> 63_ The function of this
> formula is to subtract windowSize from the overall operation result when
> {_}timestamp - offset<0{_}, otherwise do nothing. This way we can handle both
> positive and negative timestamps.
> Finally, the element can be assigned to the correct window.
> !image-2022-03-04-11-37-10-035.png|width=712,height=284!
> This code can pass current unit tests.
> h2. getWindowStartWithOffset methods in other packages
> I think that there should be many places in
> {_}getWindowStartWithOffset{_}. We searched for this method in the project
> and found that the problem of negative timestamps is handled in _flink.table._
> Below is their source code.
>
> _{{org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping}}_
> {code:java}
> private long getWindowStartWithOffset(long timestamp, long offset, long
> windowSize) {
> long remainder = (timestamp - offset) % windowSize;
> // handle both positive and negative cases
> if (remainder < 0) {
> return timestamp - (remainder + windowSize);
> } else {
> return timestamp - remainder;
> }
> } {code}
> h2. Can we make a pull request?
> If the community deems it necessary to revise it, hopefully this task
> can be handed over to us. Our members are all students who have just
> graduated from school, and it is a great encouragement for us to contribute
> code to flink.
> Thank you so much!
> From Deng Ziqi & Lin Wanni & Guo Yuanfang
>
> ----
> ----
> h2. reproduce
> {code:java}
> /* output
> WindowStart: -15000 ExactSize:1 (a,-17000)
> WindowStart: -10000 ExactSize:1 (b,-12000)
> WindowStart: -5000 ExactSize:2 (c,-7000)
> WindowStart: -5000 ExactSize:2 (d,-2000)
> WindowStart: 0 ExactSize:1 (e,3000)
> WindowStart: 5000 ExactSize:1 (f,8000)
> WindowStart: 10000 ExactSize:1 (g,13000)
> WindowStart: 15000 ExactSize:1 (h,18000)
> */
> public class Example {
> public static void main(String[] args) throws Exception {
> final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
> TimeZone.setDefault(timeZone);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
> .setParallelism(1)
> .fromElements(
> Tuple2.of("a",-17*1000L),
> Tuple2.of("b",-12*1000L),
> Tuple2.of("c",-7*1000L),
> Tuple2.of("d",-2*1000L),
> Tuple2.of("e",3*1000L),
> Tuple2.of("f",8*1000L),
> Tuple2.of("g",13*1000L),
> Tuple2.of("h",18*1000L)
> )
> .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
> .withTimestampAssigner(
> new
> SerializableTimestampAssigner<Tuple2<String, Long>>() {
> @Override
> public long
> extractTimestamp(Tuple2<String, Long> element, long l) {
> return element.f1;
> }
> }
> )
> )
> .keyBy(r->1)
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> .process(
> new ProcessWindowFunction<Tuple2<String, Long>,
> String, Integer, TimeWindow>() {
> @Override
> public void process(Integer integer,
> ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> Collector<String> out) throws Exception {
> for (Tuple2<String, Long> element : elements)
> {
> out.collect("WindowStart:
> "+context.window().getStart()
> + "\tExactSize:" +
> elements.spliterator().getExactSizeIfKnown()+"\t"
> + element
> );
> }
> }
> }
> )
> .print();
> env.execute();
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)