[
https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
realdengziqi updated FLINK-26334:
---------------------------------
Description:
source code
{code:java}
//Method to get the window start for a timestamp.
//Params:
//timestamp – epoch millisecond to get the window start.
//offset – The offset which window start would be shifted by.
//windowSize – The size of the generated windows.
//Returns:
//window start
public static long getWindowStartWithOffset(long timestamp, long offset, long
windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
} {code}
If windowSize is 6 seconds, an element with a timestamp of -7000L should be
assigned to a window with a start time of -12000L. But this code will assign it
to the window whose start time is -6000L.
According to the current calculation method, when the timestamp is (timestamp -
offset + windowSize) is less than 0, the start time of the calculated time
window will be offset by one windowsSide unit in the direction of 0.
I had a discussion with a friend and thought it was because the current
calculation logic is rounding towards 0. We should make it round to -∞.
Do you think this is a bug. We would like to submit a pull request on github to
fix it.
Below is a sample program for a scrolling window.
{code:java}
public class Test01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.fromElements(
Tuple2.of("a",-7L*1000L), // start time should be
-12s
Tuple2.of("b",-1L*1000L),
Tuple2.of("c",1L*1000L),
Tuple2.of("d",7L*1000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new
SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long
extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}
)
)
.keyBy(r->1)
.window(TumblingEventTimeWindows.of(Time.seconds(6)))
.process(
new ProcessWindowFunction<Tuple2<String, Long>, String,
Integer, TimeWindow>() {
@Override
public void process(Integer integer,
ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
Collector<String> out) throws Exception {
for (Tuple2<String, Long> element : elements) {
JSONObject item = new JSONObject();
item.put("data",element.toString());
item.put("windowStartTime",new
Timestamp(context.window().getStart()).toString() );
item.put("windowEndTime",new
Timestamp(context.window().getEnd()).toString() );
out.collect(item.toJSONString());
}
}
}
)
.print();
env.execute();
}
} {code}
was:
source code
{code:java}
//Method to get the window start for a timestamp.
//Params:
//timestamp – epoch millisecond to get the window start.
//offset – The offset which window start would be shifted by.
//windowSize – The size of the generated windows.
//Returns:
//window start
public static long getWindowStartWithOffset(long timestamp, long offset, long
windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
} {code}
If windowSize is 6 seconds, an element with a timestamp of -7000L should be
assigned to a window with a start time of -12000L. But this code will assign it
to the window whose start time is -6000L.
According to the current calculation method, when the timestamp is less than
0-offset, the start time of the calculated time window will be offset by one
windowsSide unit in the direction of 0.
I had a discussion with a friend and thought it was because the current
calculation logic is rounding towards 0. We should make it round to -∞.
Do you think this is a bug. We would like to submit a pull request on github to
fix it.
Summary: when the (timestamp - offset + windowSize) is less than 0 the
calculation result of TimeWindow.getWindowSTartWithOffset is incorrect (was:
when the watermark less than (0-offset) , the calculation result of
TimeWindow.getWindowSTartWithOffset is incorrect)
> when the (timestamp - offset + windowSize) is less than 0 the calculation
> result of TimeWindow.getWindowSTartWithOffset is incorrect
> ------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26334
> URL: https://issues.apache.org/jira/browse/FLINK-26334
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.14.3
> Environment: flink version 1.14.3
> Reporter: realdengziqi
> Priority: Major
> Original Estimate: 16h
> Remaining Estimate: 16h
>
>
> source code
> {code:java}
> //Method to get the window start for a timestamp.
> //Params:
> //timestamp – epoch millisecond to get the window start.
> //offset – The offset which window start would be shifted by.
> //windowSize – The size of the generated windows.
> //Returns:
> //window start
> public static long getWindowStartWithOffset(long timestamp, long offset, long
> windowSize) {
> return timestamp - (timestamp - offset + windowSize) % windowSize;
> } {code}
> If windowSize is 6 seconds, an element with a timestamp of -7000L should be
> assigned to a window with a start time of -12000L. But this code will assign
> it to the window whose start time is -6000L.
> According to the current calculation method, when the timestamp is (timestamp
> - offset + windowSize) is less than 0, the start time of the calculated time
> window will be offset by one windowsSide unit in the direction of 0.
> I had a discussion with a friend and thought it was because the current
> calculation logic is rounding towards 0. We should make it round to -∞.
> Do you think this is a bug. We would like to submit a pull request on github
> to fix it.
> Below is a sample program for a scrolling window.
> {code:java}
> public class Test01 {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env
> .fromElements(
> Tuple2.of("a",-7L*1000L), // start time should be
> -12s
> Tuple2.of("b",-1L*1000L),
> Tuple2.of("c",1L*1000L),
> Tuple2.of("d",7L*1000L)
> )
> .assignTimestampsAndWatermarks(
>
> WatermarkStrategy.<Tuple2<String,Long>>forMonotonousTimestamps()
> .withTimestampAssigner(
> new
> SerializableTimestampAssigner<Tuple2<String, Long>>() {
> @Override
> public long
> extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
> return element.f1;
> }
> }
> )
> )
> .keyBy(r->1)
> .window(TumblingEventTimeWindows.of(Time.seconds(6)))
> .process(
> new ProcessWindowFunction<Tuple2<String, Long>,
> String, Integer, TimeWindow>() {
> @Override
> public void process(Integer integer,
> ProcessWindowFunction<Tuple2<String, Long>, String, Integer,
> TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements,
> Collector<String> out) throws Exception {
> for (Tuple2<String, Long> element : elements)
> {
> JSONObject item = new JSONObject();
> item.put("data",element.toString());
> item.put("windowStartTime",new
> Timestamp(context.window().getStart()).toString() );
> item.put("windowEndTime",new
> Timestamp(context.window().getEnd()).toString() );
> out.collect(item.toJSONString());
> }
> }
> }
> )
> .print();
> env.execute();
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)