wuchong commented on a change in pull request #14708:
URL: https://github.com/apache/flink/pull/14708#discussion_r563306860



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.slicing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.MathUtils;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utilities to create {@link SliceAssigner}s. */
+@Internal
+public final class SliceAssigners {
+
+    // 
------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Utilities
+    // 
------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /**
+     * Creates a tumbling window {@link SliceAssigner} that assigns elements 
to slices of tumbling
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code 
-1} if based on
+     *     processing time.
+     * @param size the size of the generated windows.
+     */
+    public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration 
size) {
+        return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0);
+    }
+
+    /**
+     * Creates a hopping window {@link SliceAssigner} that assigns elements to 
slices of hopping
+     * windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code 
-1} if based on *
+     *     processing time.
+     * @param size the size of the generated windows.
+     * @param slide the slide interval of the generated windows.
+     */
+    public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration 
size, Duration slide) {
+        return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), 
slide.toMillis(), 0);
+    }
+
+    /**
+     * Creates a cumulative window {@link SliceAssigner} that assigns elements 
to slices of
+     * cumulative windows.
+     *
+     * @param rowtimeIndex the index of rowtime field in the input row, {@code 
-1} if based on *
+     *     processing time.
+     * @param maxSize the max size of the generated windows.
+     * @param step the step interval of the generated windows.
+     */
+    public static CumulativeSliceAssigner cumulative(
+            int rowtimeIndex, Duration maxSize, Duration step) {
+        return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), 
step.toMillis(), 0);
+    }
+
+    /**
+     * Creates a {@link SliceAssigner} that assigns elements which has been 
attached window start
+     * and window end timestamp to slices. The assigned slice is equal to the 
given window.
+     *
+     * @param windowEndIndex the index of window end field in the input row, 
mustn't be a negative
+     *     value.
+     * @param windowSize the size of the generated window.
+     */
+    public static WindowedSliceAssigner windowed(int windowEndIndex, Duration 
windowSize) {
+        return new WindowedSliceAssigner(windowEndIndex, 
windowSize.toMillis());
+    }
+
+    // 
------—------—------—------—------—------—------—------—------—------—------—------—------—
+    // Slice Assigners
+    // 
------—------—------—------—------—------—------—------—------—------—------—------—------—
+
+    /** The {@link SliceAssigner} for tumbling windows. */
+    public static final class TumblingSliceAssigner extends 
AbstractSliceAssigner
+            implements SliceUnsharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link TumblingSliceAssigner} with a new specified 
offset. */
+        public TumblingSliceAssigner withOffset(Duration offset) {
+            return new TumblingSliceAssigner(rowtimeIndex, size, 
offset.toMillis());
+        }
+
+        private final long size;
+        private final long offset;
+        private final ReusableListIterable reuseList = new 
ReusableListIterable();
+
+        private TumblingSliceAssigner(int rowtimeIndex, long size, long 
offset) {
+            super(rowtimeIndex);
+            checkArgument(
+                    size > 0,
+                    String.format(
+                            "Tumbling Window parameters must satisfy size > 0, 
but got size %dms.",
+                            size));
+            checkArgument(
+                    Math.abs(offset) < size,
+                    String.format(
+                            "Tumbling Window parameters must satisfy 
abs(offset) < size, bot got size %dms and offset %dms.",
+                            size, offset));
+            this.size = size;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, 
offset, size);
+            return start + size;
+        }
+
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            reuseList.reset(windowEnd);
+            return reuseList;
+        }
+    }
+
+    /** The {@link SliceAssigner} for hopping windows. */
+    public static final class HoppingSliceAssigner extends 
AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link HoppingSliceAssigner} with a new specified 
offset. */
+        public HoppingSliceAssigner withOffset(Duration offset) {
+            return new HoppingSliceAssigner(rowtimeIndex, size, slide, 
offset.toMillis());
+        }
+
+        private final long size;
+        private final long slide;
+        private final long offset;
+        private final long sliceSize;
+        private final int numSlicesPerWindow;
+        private final ReusableListIterable reuseList = new 
ReusableListIterable();
+
+        protected HoppingSliceAssigner(int rowtimeIndex, long size, long 
slide, long offset) {
+            super(rowtimeIndex);
+            if (size <= 0 || slide <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window must satisfy slide > 0 and 
size > 0, but got slide %dms and size %dms.",
+                                slide, size));
+            }
+            if (size % slide != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Hopping Window requires size must be an 
integral multiple of slide, but got size %dms and slide %dms.",
+                                size, slide));
+            }
+            this.size = size;
+            this.slide = slide;
+            this.offset = offset;
+            this.sliceSize = ArithmeticUtils.gcd(size, slide);
+            this.numSlicesPerWindow = MathUtils.checkedDownCast(size / 
sliceSize);
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, 
offset, sliceSize);
+            return start + sliceSize;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return windowEnd - size;
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            // we need to cleanup the first slice of the window
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + sliceSize;
+            reuseList.reset(firstSliceEnd);
+            return reuseList;
+        }
+
+        @Override
+        public void mergeSlices(long sliceEnd, MergeCallback callback) throws 
Exception {
+            // the iterable to list all the slices of the triggered window
+            Iterable<Long> toBeMerged =
+                    new HoppingSlicesIterable(sliceEnd, sliceSize, 
numSlicesPerWindow);
+            // null namespace means use heap data views, instead of state 
state views
+            callback.merge(null, toBeMerged);
+        }
+
+        @Override
+        public Optional<Long> nextTriggerWindow(long windowEnd, 
Supplier<Boolean> isWindowEmpty) {
+            if (isWindowEmpty.get()) {
+                return Optional.empty();
+            } else {
+                return Optional.of(windowEnd + sliceSize);
+            }
+        }
+    }
+
+    /** The {@link SliceAssigner} for cumulative windows. */
+    public static final class CumulativeSliceAssigner extends 
AbstractSliceAssigner
+            implements SliceSharedAssigner {
+        private static final long serialVersionUID = 1L;
+
+        /** Creates a new {@link CumulativeSliceAssigner} with a new specified 
offset. */
+        public CumulativeSliceAssigner withOffset(Duration offset) {
+            return new CumulativeSliceAssigner(rowtimeIndex, maxSize, step, 
offset.toMillis());
+        }
+
+        private final long maxSize;
+        private final long step;
+        private final long offset;
+        private final ReusableListIterable reuseList = new 
ReusableListIterable();
+
+        protected CumulativeSliceAssigner(int rowtimeIndex, long maxSize, long 
step, long offset) {
+            super(rowtimeIndex);
+            if (maxSize <= 0 || step <= 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window parameters must satisfy 
maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+            if (maxSize % step != 0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cumulative Window requires maxSize must be an 
integral multiple of step, but got maxSize %dms and step %dms.",
+                                maxSize, step));
+            }
+
+            this.maxSize = maxSize;
+            this.step = step;
+            this.offset = offset;
+        }
+
+        @Override
+        public long assignSliceEnd(long timestamp) {
+            long start = TimeWindow.getWindowStartWithOffset(timestamp, 
offset, step);
+            return start + step;
+        }
+
+        @Override
+        public long getWindowStart(long windowEnd) {
+            return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, 
maxSize);
+        }
+
+        @Override
+        public Iterable<Long> expiredSlices(long windowEnd) {
+            long windowStart = getWindowStart(windowEnd);
+            long firstSliceEnd = windowStart + step;
+            long lastSliceEnd = windowStart + maxSize;
+            if (windowEnd == firstSliceEnd) {
+                // we reuse state in the first slice, skip cleanup for the 
first slice
+                return Collections.emptyList();

Review comment:
       Unfortunately, we can't. Because the `reuseList` is not only used to 
construct expried slices but also the merged slices. I think it's fine to think 
it as a resued wrapper of result. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to