[ 
https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

realdengziqi updated FLINK-26334:
---------------------------------
     Attachment: image-2022-03-04-11-28-26-616.png
    Description: 
h2. issue

        Hello!

        When we were studying the flink source code, we found that there was a 
problem with its algorithm for calculating the window start time. When 
timestamp - offset + windowSize < 0 , the element will be incorrectly allocated 
to a window with a WindowSize larger than its own timestamp.

        The problem is in 
_org.apache.flink.streaming.api.windowing.windows.TimeWindow_

 
{code:java}
public static long getWindowStartWithOffset(long timestamp, long offset, long 
windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
} {code}
_!image-2022-03-04-11-28-26-616.png|width=738,height=261!_

 

  was:
 

source code
{code:java}
//Method to get the window start for a timestamp.
//Params:
//timestamp – epoch millisecond to get the window start.
//offset – The offset which window start would be shifted by.
//windowSize – The size of the generated windows.
//Returns:
//window start
public static long getWindowStartWithOffset(long timestamp, long offset, long 
windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
} {code}
If windowSize is 5 seconds, an element with a timestamp of -7000L should be 
assigned to a window with a start time of -10*1000L. But this code will assign 
it to the window whose start time is -5000L.

According to the current calculation method, when the timestamp is (timestamp - 
offset + windowSize)  is less than 0, the start time of the calculated time 
window will be offset by one windowsSide unit in the direction of 0.

I had a discussion with a friend and thought it was because the current 
calculation logic is rounding towards 0. We should make it round to -∞.

Do you think this is a bug. We would like to submit a pull request on github to 
fix it.

Below is a sample program for a scrolling window.
{code:java}
public class Test01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env
                .fromElements(
                        Tuple2.of("a",-7L*1000L),    // start time should be 
-12s
                        Tuple2.of("b",-1L*1000L),
                        Tuple2.of("c",1L*1000L),
                        Tuple2.of("d",7L*1000L)
                )
                .assignTimestampsAndWatermarks(
                        
WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new 
SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long 
extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                                return element.f1;
                                            }
                                        }
                                )
                )
                .keyBy(r->1)
                .window(TumblingEventTimeWindows.of(Time.seconds(6)))
                .process(
                        new ProcessWindowFunction<Tuple2<String, Long>, String, 
Integer, TimeWindow>() {
                            @Override
                            public void process(Integer integer, 
ProcessWindowFunction<Tuple2<String, Long>, String, Integer, 
TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, 
Collector<String> out) throws Exception {
                                for (Tuple2<String, Long> element : elements) {
                                    JSONObject item = new JSONObject();
                                    item.put("data",element.toString());
                                    item.put("windowStartTime",new 
Timestamp(context.window().getStart()).toString() );
                                    item.put("windowEndTime",new 
Timestamp(context.window().getEnd()).toString() );
                                    out.collect(item.toJSONString());
                                }

                            }
                        }
                )
                .print();
        env.execute();
    }
} {code}
 

        Summary: When timestamp - offset + windowSize < 0, elements cannot be 
assigned to the correct window  (was: when the (timestamp - offset + 
windowSize) is less than 0 the calculation result of 
TimeWindow.getWindowSTartWithOffset is incorrect)

> When timestamp - offset + windowSize < 0, elements cannot be assigned to the 
> correct window
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26334
>                 URL: https://issues.apache.org/jira/browse/FLINK-26334
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.0, 1.14.3
>         Environment: flink version 1.14.3
>            Reporter: realdengziqi
>            Priority: Major
>         Attachments: image-2022-03-04-11-28-26-616.png
>
>   Original Estimate: 16h
>  Remaining Estimate: 16h
>
> h2. issue
>         Hello!
>         When we were studying the flink source code, we found that there was 
> a problem with its algorithm for calculating the window start time. When 
> timestamp - offset + windowSize < 0 , the element will be incorrectly 
> allocated to a window with a WindowSize larger than its own timestamp.
>         The problem is in 
> _org.apache.flink.streaming.api.windowing.windows.TimeWindow_
>  
> {code:java}
> public static long getWindowStartWithOffset(long timestamp, long offset, long 
> windowSize) {
>     return timestamp - (timestamp - offset + windowSize) % windowSize;
> } {code}
> _!image-2022-03-04-11-28-26-616.png|width=738,height=261!_
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to