This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 65f022853151c79a67c387147633fecfe8e838e4 Author: xuyang <xyzhong...@163.com> AuthorDate: Fri Jan 19 17:09:29 2024 +0800 [FLINK-34100][table] Add function getDescription for internal interface WindowAssigner This closes #24162 --- .../window/tvf/common/WindowAssigner.java | 5 ++++ .../window/tvf/slicing/SliceAssigners.java | 35 ++++++++++++++++++++++ .../window/tvf/unslicing/UnsliceAssigners.java | 7 +++++ 3 files changed, 47 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java index 567a19a3b2a..e93d894384b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.operators.window.tvf.common; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; import java.io.Serializable; @@ -31,6 +32,7 @@ import java.io.Serializable; * * <p>See more details in {@link WindowOperatorBase}. */ +@Internal public interface WindowAssigner extends Serializable { /** @@ -38,4 +40,7 @@ public interface WindowAssigner extends Serializable { * based on processing time. */ boolean isEventTime(); + + /** Returns a description of this window assigner. */ + String getDescription(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java index b249b433c31..14af977c5d3 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java @@ -188,6 +188,11 @@ public final class SliceAssigners { public long getSliceEndInterval() { return size; } + + @Override + public String getDescription() { + return String.format("TumblingWindow(size=%dms, offset=%dms)", size, offset); + } } /** The {@link SliceAssigner} for hopping windows. */ @@ -278,6 +283,12 @@ public final class SliceAssigners { return Optional.of(windowEnd + sliceSize); } } + + @Override + public String getDescription() { + return String.format( + "HoppingWindow(size=%dms, slide=%dms, offset=%dms)", size, slide, offset); + } } /** The {@link SliceAssigner} for cumulative windows. */ @@ -384,6 +395,13 @@ public final class SliceAssigners { return Optional.of(nextWindowEnd); } } + + @Override + public String getDescription() { + return String.format( + "CumulativeWindow(maxSize=%dms, step=%dms, offset=%dms)", + maxSize, step, offset); + } } /** @@ -438,6 +456,13 @@ public final class SliceAssigners { // it always works in event-time mode if input row has been attached windows return true; } + + @Override + public String getDescription() { + return String.format( + "WindowedSliceWindow(innerWindow=%s, windowEndIndex=%d)", + innerAssigner, windowEndIndex); + } } /** @@ -469,6 +494,11 @@ public final class SliceAssigners { public long getLastWindowEnd(long sliceEnd) { return innerAssigner.getLastWindowEnd(sliceEnd); } + + @Override + public String getDescription() { + return String.format("SlicedSharedSliceWindow(innerWindow=%s)", innerAssigner); + } } /** @@ -491,6 +521,11 @@ public final class SliceAssigners { // can't be shared with other windows and the last window should be itself. return sliceEnd; } + + @Override + public String getDescription() { + return String.format("SlicedUnSharedSliceWindow(innerWindow=%s)", innerAssigner); + } } /** diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java index 93b9455d2ed..ba34c4e3b67 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java @@ -61,6 +61,7 @@ public class UnsliceAssigners { private static final long serialVersionUID = 1L; private final int rowtimeIndex; + private final long sessionGap; private final boolean isEventTime; private final ZoneId shiftTimeZone; @@ -69,6 +70,7 @@ public class UnsliceAssigners { public SessionUnsliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone, long sessionGap) { this.rowtimeIndex = rowtimeIndex; this.shiftTimeZone = shiftTimeZone; + this.sessionGap = sessionGap; this.isEventTime = rowtimeIndex >= 0; this.innerSessionWindowAssigner = SessionWindowAssigner.withGap(Duration.ofMillis(sessionGap)); @@ -138,5 +140,10 @@ public class UnsliceAssigners { public boolean isEventTime() { return isEventTime; } + + @Override + public String getDescription() { + return String.format("SessionWindow(gap=%dms)", sessionGap); + } } }