wuchong commented on a change in pull request #15485:
URL: https://github.com/apache/flink/pull/15485#discussion_r609455324
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
##########
@@ -133,10 +134,9 @@ public boolean processElement(RowData key, RowData
element) throws Exception {
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
if (!isEventTime) {
// always register processing time for every element when
processing time mode
- timerService.registerProcessingTimeTimer(
- sliceEnd, TimeWindowUtil.toEpochMillsForTimer(sliceEnd -
1, shiftTimeZone));
+ windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
}
- if (isEventTime && sliceEnd - 1 <= currentProgress) {
+ if (isEventTime && toEpochMillsForTimer(sliceEnd - 1, shiftTimeZone)
<= currentProgress) {
Review comment:
I noticed that we have many places calling this logic. I think we can
extract the comparison into a util method in `TimeWindowUtils`, e.g.
```java
// TODO: add javadocs
public static boolean isWindowFired(
long windowEnd, long currentProgress, ZoneId shiftTimeZone) {
long windowTriggerTime = toEpochMillsForTimer(windowEnd - 1,
shiftTimeZone);
return currentProgress >= windowTriggerTime;
}
```
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -81,7 +86,7 @@ public void addElement(RowData key, long sliceEnd, RowData
element) throws Excep
@Override
public void advanceProgress(long progress) throws Exception {
- if (progress >= minTriggerTime) {
+ if (progress >= TimeWindowUtil.toEpochMillsForTimer(minSliceEnd,
shiftTimeZone)) {
Review comment:
Should be `toEpochMillsForTimer(minSliceEnd - 1, shiftTimeZone)`.
Would be better to have a local variable name, e.g. `minTriggerTime`, which
will help to understand the meaning.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
##########
@@ -177,7 +192,7 @@ private long computeMemorySize() {
// ------------------------------------------------------------------------
/** Method to get the next watermark to trigger window. */
private static long getNextTriggerWatermark(long currentWatermark, long
interval) {
- long start = TimeWindow.getWindowStartWithOffset(currentWatermark, 0L,
interval);
+ long start = getWindowStartWithOffset(currentWatermark, 0L, interval);
Review comment:
Need to shift `currentWatermark` to UTC millis before getting window
start?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java
##########
@@ -42,30 +44,33 @@
private final WindowBytesMultiMap recordsBuffer;
private final WindowKey reuseWindowKey;
private final AbstractRowDataSerializer<RowData> recordSerializer;
+ private final ZoneId shiftTimeZone;
- private long minTriggerTime = Long.MAX_VALUE;
+ private long minSliceEnd = Long.MAX_VALUE;
public RecordsWindowBuffer(
Object operatorOwner,
MemoryManager memoryManager,
long memorySize,
WindowCombineFunction combineFunction,
PagedTypeSerializer<RowData> keySer,
- AbstractRowDataSerializer<RowData> inputSer) {
+ AbstractRowDataSerializer<RowData> inputSer,
+ ZoneId shiftTimeZone) {
this.combineFunction = combineFunction;
this.recordsBuffer =
new WindowBytesMultiMap(
operatorOwner, memoryManager, memorySize, keySer,
inputSer.getArity());
this.recordSerializer = inputSer;
this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
+ this.shiftTimeZone = shiftTimeZone;
}
@Override
public void addElement(RowData key, long sliceEnd, RowData element) throws
Exception {
// track the lowest trigger time, if watermark exceeds the trigger
time,
// it means there are some elements in the buffer belong to a window
going to be fired,
// and we need to flush the buffer into state for firing.
- minTriggerTime = Math.min(sliceEnd - 1, minTriggerTime);
+ minSliceEnd = Math.min(sliceEnd - 1, minSliceEnd);
Review comment:
Should be `sliceEnd` instead of `sliceEnd - 1`. Otherwise, we shouldn't
call this `minSliceEnd`.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/window/processors/WindowRankProcessor.java
##########
@@ -60,13 +66,17 @@
private final long rankEnd;
private final boolean outputRankNumber;
private final int windowEndIndex;
+ private final ZoneId shiftTimeZone;
//
----------------------------------------------------------------------------------------
private transient long currentProgress;
private transient Context<Long> ctx;
+ private transient InternalTimerService<Long> timerService;
Review comment:
This member field is not needed anymore.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java
##########
@@ -67,6 +64,8 @@
protected transient InternalTimerService<Long> timerService;
Review comment:
We don't need this member field anymore.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java
##########
@@ -94,28 +95,33 @@ public void open(TriggerContext ctx) throws Exception {
@Override
public boolean onElement(Object element, long timestamp, W window)
throws Exception {
- if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
+ if (triggerTime(window) <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return true;
} else {
- ctx.registerEventTimeTimer(window.maxTimestamp());
+ ctx.registerEventTimeTimer(triggerTime(window));
return false;
}
}
+ private long triggerTime(W window) {
+ return toEpochMillsForTimer(window.maxTimestamp(),
ctx.getShiftTimeZone());
+ }
+
@Override
public boolean onProcessingTime(long time, W window) throws Exception {
return false;
}
@Override
public boolean onEventTime(long time, W window) throws Exception {
- return time == window.maxTimestamp();
+ return time == toEpochMillsForTimer(window.maxTimestamp(),
ctx.getShiftTimeZone());
Review comment:
Please replace all the `toEpochMillsForTimer(window.maxTimestamp(),
ctx.getShiftTimeZone())` with `triggerTime(window)`. We can have an abstract
class for the triggers with a protected `triggerTime` method in it to avoid
duplicate implement this method.
Same to `ProcessingTimeTriggers`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]