[
https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500621#comment-17500621
]
Martijn Visser commented on FLINK-26334:
----------------------------------------
[~realdengziqi] Thanks for reporting this. I have reduced the priority given
that I don't think that this should block a Flink release. I'm hoping that some
of the maintainers can give more insights on this, but since we're getting
close to the new Flink release, most of them are busy with testing so it might
take a bit longer.
> when the (timestamp - offset + windowSize) is less than 0 the calculation
> result of TimeWindow.getWindowSTartWithOffset is incorrect
> ------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26334
> URL: https://issues.apache.org/jira/browse/FLINK-26334
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.15.0, 1.14.3
> Environment: flink version 1.14.3
> Reporter: realdengziqi
> Priority: Major
> Original Estimate: 16h
> Remaining Estimate: 16h
>
>
> source code
> {code:java}
> //Method to get the window start for a timestamp.
> //Params:
> //timestamp – epoch millisecond to get the window start.
> //offset – The offset which window start would be shifted by.
> //windowSize – The size of the generated windows.
> //Returns:
> //window start
> public static long getWindowStartWithOffset(long timestamp, long offset, long
> windowSize) {
> return timestamp - (timestamp - offset + windowSize) % windowSize;
> } {code}
> If windowSize is 5 seconds, an element with a timestamp of -7000L should be
> assigned to a window with a start time of -10*1000L. But this code will
> assign it to the window whose start time is -5000L.
> According to the current calculation method, when the timestamp is (timestamp
> - offset + windowSize) is less than 0, the start time of the calculated time
> window will be offset by one windowsSide unit in the direction of 0.
> I had a discussion with a friend and thought it was because the current
> calculation logic is rounding towards 0. We should make it round to -∞.
> Do you think this is a bug. We would like to submit a pull request on github
> to fix it.
> Below is a sample program for a scrolling window.
> {code:java}
> public class Test01 {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env
> .fromElements(
> Tuple2.of("a",-7L*1000L), // start time should be
> -12s
> Tuple2.of("b",-1L*1000L),
> Tuple2.of("c",1L*1000L),
> Tuple2.of("d",7L*1000L)
> )
> .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
> .withTimestampAssigner(
> new
> SerializableTimestampAssigner<Tuple2<String, Long>>() {
> @Override
> public long
> extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
> return element.f1;
> }
> }
> )
> )
> .keyBy(r->1)
> .window(TumblingEventTimeWindows.of(Time.seconds(6)))
> .process(
> new ProcessWindowFunction<Tuple2<String, Long>,
> String, Integer, TimeWindow>() {
> @Override
> public void process(Integer integer,
> ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> Collector<String> out) throws Exception {
> for (Tuple2<String, Long> element : elements)
> {
> JSONObject item = new JSONObject();
> item.put("data",element.toString());
> item.put("windowStartTime",new
> Timestamp(context.window().getStart()).toString() );
> item.put("windowEndTime",new
> Timestamp(context.window().getEnd()).toString() );
> out.collect(item.toJSONString());
> }
> }
> }
> )
> .print();
> env.execute();
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)