aljoscha commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r438698136
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -107,7 +123,22 @@ public static TumblingProcessingTimeWindows of(Time size) {
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(Time size, Time offset) {
- return new TumblingProcessingTimeWindows(size.toMilliseconds(),
offset.toMilliseconds());
+ return new TumblingProcessingTimeWindows(size.toMilliseconds(),
offset.toMilliseconds(), WindowStagger.ALIGNED);
+ }
+
+ /**
+ * Creates a new {@code TumblingProcessingTimeWindows} {@link
WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp, offset and
a staggering offset sampled
+ * from uniform distribution(0, window size) for each pane.
Review comment:
Is this correct? The staggering offset depends on the passed in
`WindowStagger`, right?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
private final long size;
- private final long offset;
+ private final long globalOffset;
- private TumblingProcessingTimeWindows(long size, long offset) {
+ private Long staggerOffset = null;
+
+ private final WindowStagger windowStagger;
+
+ private TumblingProcessingTimeWindows(long size, long offset,
WindowStagger windowStagger) {
if (Math.abs(offset) >= size) {
throw new
IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy
abs(offset) < size");
}
this.size = size;
- this.offset = offset;
+ this.globalOffset = offset;
+ this.windowStagger = windowStagger;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
- long start = TimeWindow.getWindowStartWithOffset(now, offset,
size);
+ if (staggerOffset == null) {
+ staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
+ }
+ long start = TimeWindow.getWindowStartWithOffset(now,
(globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start +
size));
}
public long getSize() {
return size;
}
+ public long getGlobalOffset() {
Review comment:
I don't think we need these additional public getters.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A {@code WindowStagger} staggers offset in runtime for each window
assignment.
+ */
+public enum WindowStagger {
+ /**
+ * Default mode, all panes fire at the same time across all partitions.
+ */
+ ALIGNED {
+ @Override
+ public long getStaggerOffset(
+ final long currentProcessingTime,
+ final long size) {
+ return 0L;
+ }
+ },
+
+ /**
+ * Stagger offset is sampled from uniform distribution U(0, WindowSize)
when first event ingested in the partitioned operator.
+ */
+ RANDOM {
+ @Override
+ public long getStaggerOffset(
+ final long currentProcessingTime,
+ final long size) {
+ return (long) (ThreadLocalRandom.current().nextDouble()
* size);
+ }
+ },
+
+ /**
+ * Stagger offset is the ingestion delay in processing time, which is
the difference between first event ingestion time and its corresponding
processing window start time
+ * in the partitioned operator. In other words, each partitioned window
starts when its first pane created.
Review comment:
I think this is incorrect and should be more like the description of
`RANDOM`. We determine the stagger offset when "the first event is ingested in
the partitioned operator". It's not determined for each pane.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -107,7 +123,22 @@ public static TumblingProcessingTimeWindows of(Time size) {
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(Time size, Time offset) {
- return new TumblingProcessingTimeWindows(size.toMilliseconds(),
offset.toMilliseconds());
+ return new TumblingProcessingTimeWindows(size.toMilliseconds(),
offset.toMilliseconds(), WindowStagger.ALIGNED);
+ }
+
+ /**
+ * Creates a new {@code TumblingProcessingTimeWindows} {@link
WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp, offset and
a staggering offset sampled
+ * from uniform distribution(0, window size) for each pane.
+ *
+ * @param size The size of the generated windows.
+ * @param offset The offset which window start would be shifted by.
+ * @param windowStagger The utility that produces staggering offset in
runtime.
+ *
+ * @return The time policy.
+ */
+ public static TumblingProcessingTimeWindows of(Time size, Time offset,
WindowStagger windowStagger) throws Exception {
Review comment:
We don't need the `throws` here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]