lindong28 commented on code in PR #157:
URL: https://github.com/apache/flink-ml/pull/157#discussion_r978349873


##########
flink-ml-python/pyflink/ml/lib/clustering/tests/test_agglomerativeclustering.py:
##########
@@ -157,6 +163,30 @@ def test_transform(self):
         
self.verify_clustering_result(self.eucliean_ward_threshold_as_two_result,
                                       outputs[0], "features", "pred")
 
+    def test_transform_with_count_window(self):
+        input_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (Vectors.dense([1, 1]),),
+                (Vectors.dense([1, 4]),),
+                (Vectors.dense([1, 0]),),
+                (Vectors.dense([4, 1.5]),),
+                (Vectors.dense([4, 4]),),
+                (Vectors.dense([4, 0]),),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['features'],
+                    [DenseVectorTypeInfo()])))
+
+        agglomerative_clustering = AgglomerativeClustering() \
+            .set_linkage('average') \
+            .set_distance_measure('euclidean') \
+            .set_prediction_col('pred') \
+            .set_windows(CountTumblingWindows.of(6))

Review Comment:
   Can we use `CountTumblingWindows.of(5)` so that the input stream (which 
contains 6 elements) can be split into two windows? Having all elements in the 
same window seems a bit trivial.



##########
flink-ml-python/pyflink/ml/core/windows.py:
##########
@@ -0,0 +1,151 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC
+
+from pyflink.common.time import Time
+
+
+class Windows(ABC):
+    """
+    Windowing strategy that determines how to create mini-batches from input 
data.
+    """
+    pass
+
+
+class GlobalWindows(Windows):
+    """
+    A Windows that assigns all the elements into a single global window.
+    In order for this windowing strategy to work correctly, the input
+    stream must be bounded.
+    """
+
+    def __eq__(self, other):
+        return isinstance(other, GlobalWindows)
+
+
+class CountTumblingWindows(Windows):
+    """
+    A Windows that groups elements into windows of fixed number of
+    elements. Windows do not overlap.
+    """
+
+    def __init__(self, size: int):
+        super().__init__()
+        self._size = size
+
+    @staticmethod
+    def of(size: int) -> 'CountTumblingWindows':
+        return CountTumblingWindows(size)
+
+    @property
+    def size(self) -> int:
+        return self._size
+
+    def __eq__(self, other):
+        return isinstance(other, CountTumblingWindows) and self._size == 
other._size
+
+
+class EventTimeTumblingWindows(Windows):
+    """
+    A Windows that groups elements into fixed-size windows based on
+    the timestamp of the elements. Windows do not overlap.
+    """
+
+    def __init__(self, size: Time):
+        super().__init__()
+        self._size = size
+
+    @staticmethod
+    def of(size: Time) -> 'EventTimeTumblingWindows':
+        return EventTimeTumblingWindows(size)
+
+    @property
+    def size(self) -> Time:
+        return self._size
+
+    def __eq__(self, other):
+        return isinstance(other, EventTimeTumblingWindows) and self._size == 
other._size
+
+
+class ProcessingTimeTumblingWindows(Windows):
+    """
+    A Windows that groups elements into fixed-size windows based on
+    the current system time of the machine the operation is running
+    on. Windows do not overlap.
+    """
+
+    def __init__(self, size: Time):
+        super().__init__()
+        self._size = size
+
+    @staticmethod
+    def of(size: Time) -> 'ProcessingTimeTumblingWindows':
+        return ProcessingTimeTumblingWindows(size)
+
+    @property
+    def size(self) -> Time:
+        return self._size
+
+    def __eq__(self, other):
+        return isinstance(other, ProcessingTimeTumblingWindows) and self._size 
== other._size
+
+
+class EventTimeSessionWindows(Windows):
+    """
+    A Windows that windows elements into sessions based on the
+    timestamp of the elements. Windows do not overlap.
+    """
+
+    def __init__(self, gap: Time):
+        super().__init__()
+        self._gap = gap
+
+    @staticmethod
+    def with_gap(gap: Time) -> 'EventTimeSessionWindows':
+        return EventTimeSessionWindows(gap)
+
+    @property
+    def gap(self) -> Time:
+        return self._gap
+
+    def __eq__(self, other):
+        return isinstance(other, EventTimeSessionWindows) and self._gap == 
other._gap
+
+
+class ProcessingTimeSessionWindows(Windows):
+    """
+    A Windows that windows elements into sessions based on the current

Review Comment:
   `windows` -> `groups`



##########
flink-ml-python/pyflink/ml/core/windows.py:
##########
@@ -0,0 +1,151 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC
+
+from pyflink.common.time import Time
+
+
+class Windows(ABC):
+    """
+    Windowing strategy that determines how to create mini-batches from input 
data.
+    """
+    pass
+
+
+class GlobalWindows(Windows):
+    """
+    A Windows that assigns all the elements into a single global window.
+    In order for this windowing strategy to work correctly, the input
+    stream must be bounded.
+    """
+
+    def __eq__(self, other):
+        return isinstance(other, GlobalWindows)
+
+
+class CountTumblingWindows(Windows):
+    """
+    A Windows that groups elements into windows of fixed number of
+    elements. Windows do not overlap.
+    """
+
+    def __init__(self, size: int):
+        super().__init__()
+        self._size = size
+
+    @staticmethod
+    def of(size: int) -> 'CountTumblingWindows':
+        return CountTumblingWindows(size)
+
+    @property
+    def size(self) -> int:
+        return self._size
+
+    def __eq__(self, other):
+        return isinstance(other, CountTumblingWindows) and self._size == 
other._size
+
+
+class EventTimeTumblingWindows(Windows):
+    """
+    A Windows that groups elements into fixed-size windows based on
+    the timestamp of the elements. Windows do not overlap.
+    """
+
+    def __init__(self, size: Time):
+        super().__init__()
+        self._size = size
+
+    @staticmethod
+    def of(size: Time) -> 'EventTimeTumblingWindows':
+        return EventTimeTumblingWindows(size)
+
+    @property
+    def size(self) -> Time:
+        return self._size
+
+    def __eq__(self, other):
+        return isinstance(other, EventTimeTumblingWindows) and self._size == 
other._size
+
+
+class ProcessingTimeTumblingWindows(Windows):
+    """
+    A Windows that groups elements into fixed-size windows based on
+    the current system time of the machine the operation is running
+    on. Windows do not overlap.
+    """
+
+    def __init__(self, size: Time):
+        super().__init__()
+        self._size = size
+
+    @staticmethod
+    def of(size: Time) -> 'ProcessingTimeTumblingWindows':
+        return ProcessingTimeTumblingWindows(size)
+
+    @property
+    def size(self) -> Time:
+        return self._size
+
+    def __eq__(self, other):
+        return isinstance(other, ProcessingTimeTumblingWindows) and self._size 
== other._size
+
+
+class EventTimeSessionWindows(Windows):
+    """
+    A Windows that windows elements into sessions based on the

Review Comment:
   nits: `windows` -> `groups` for consistency with the docs above.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -70,6 +68,12 @@
  * second one contains the information of merging two clusters at each step. 
The data format of the
  * merging information is (clusterId1, clusterId2, distance, 
sizeOfMergedCluster).
  *
+ * <p>This operator supports the {@link 
org.apache.flink.ml.common.param.HasWindows} parameter,
+ * which creates mini-batches(windows) from input data and feeds each batch 
into this operator. The
+ * operator would perform the clustering operation on each mini-batch 
independently and output their
+ * clustering results in order. By default, all input data would be placed 
into the same window and
+ * the clustering process would be a batch operation.

Review Comment:
   I am not sure `clustering process would be a batch operation` is clear and 
necessary. Would it be simpler to remove this statement?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
         }
     }
 
+    /**
+     * Applies windowAll() and process() operation on the input stream.
+     *
+     * @param input The input data stream.
+     * @param windows The window that defines how input data would be sliced 
into batches.
+     * @param function The user defined process function.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> 
windowAllProcess(
+            DataStream<IN> input, Windows windows, 
ProcessAllWindowFunction<IN, OUT, W> function) {
+        AllWindowedStream<IN, W> allWindowedStream;
+        if (windows instanceof GlobalWindows) {
+            allWindowedStream = input.windowAll((WindowAssigner) 
EndOfStreamWindows.get());
+        } else if (windows instanceof CountTumblingWindows) {
+            long countWindowSize = ((CountTumblingWindows) windows).getSize();
+            allWindowedStream = (AllWindowedStream<IN, W>) 
input.countWindowAll(countWindowSize);
+        } else {
+            allWindowedStream =
+                    input.windowAll((WindowAssigner) 
getDataStreamTimeWindowAssigner(windows));

Review Comment:
   Would it be simpler to also handle `windows instanceof GlobalWindows` here?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
         }
     }
 
+    /**
+     * Applies windowAll() and process() operation on the input stream.

Review Comment:
   Can you try to document the method based on its public behavior w.r.t. its 
input/output rather than operators used in its implementation?
   
   Maybe checkout `DataStream::windowAll(...)`'s java doc for example.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
         }
     }
 
+    /**
+     * Applies windowAll() and process() operation on the input stream.
+     *
+     * @param input The input data stream.
+     * @param windows The window that defines how input data would be sliced 
into batches.
+     * @param function The user defined process function.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> 
windowAllProcess(
+            DataStream<IN> input, Windows windows, 
ProcessAllWindowFunction<IN, OUT, W> function) {
+        AllWindowedStream<IN, W> allWindowedStream;
+        if (windows instanceof GlobalWindows) {
+            allWindowedStream = input.windowAll((WindowAssigner) 
EndOfStreamWindows.get());
+        } else if (windows instanceof CountTumblingWindows) {
+            long countWindowSize = ((CountTumblingWindows) windows).getSize();
+            allWindowedStream = (AllWindowedStream<IN, W>) 
input.countWindowAll(countWindowSize);
+        } else {
+            allWindowedStream =
+                    input.windowAll((WindowAssigner) 
getDataStreamTimeWindowAssigner(windows));

Review Comment:
   Would it be simpler to also handle `windows instanceof GlobalWindows` here?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/CountTumblingWindows.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that groups elements into windows of fixed number of 
elements. Windows do not

Review Comment:
   Would the following doc be more consistent with the doc of other windows 
subclasses?
   
   A windowing strategy that groups elements into fixed-size windows based on 
the count number of the elements. Windows do not overlap.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.common.window.GlobalWindows;
+import org.apache.flink.ml.common.window.Windows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared windows param. */
+public interface HasWindows<T> extends WithParams<T> {
+    Param<Windows> WINDOWS =
+            new WindowsParam(
+                    "windows",
+                    "Windowing strategy that determines how to create 
mini-batches from input data. "
+                            + "By default, all data in the input stream would 
be placed into the same window.",

Review Comment:
   Would it be simpler to remove the description of the default behavior for 
consistency with other parameters?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link Windows}. */
+@SuppressWarnings("unchecked")
+public class WindowsTest extends AbstractTestBase {
+    private static final int RECORD_NUM = 100;
+
+    private static List<Long> inputData;
+
+    private static DataStream<Long> inputStream;
+    private static DataStream<Long> inputStreamWithProcessingTimeGap;
+    private static DataStream<Long> inputStreamWithEventTime;
+
+    @BeforeClass
+    public static void beforeClass() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        inputData = new ArrayList<>();
+        for (long i = 0; i < RECORD_NUM; i++) {
+            inputData.add(i);
+        }
+        inputStream = env.fromCollection(inputData);
+
+        inputStreamWithProcessingTimeGap =
+                inputStream
+                        .map(
+                                new MapFunction<Long, Long>() {
+                                    private int count = 0;
+
+                                    @Override
+                                    public Long map(Long value) throws 
Exception {
+                                        count++;
+                                        if (count % (RECORD_NUM / 2) == 0) {
+                                            Thread.sleep(1000);
+                                        }
+                                        return value;
+                                    }
+                                })
+                        .setParallelism(1);
+
+        inputStreamWithEventTime =
+                inputStream.assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Long>forMonotonousTimestamps()
+                                .withTimestampAssigner(
+                                        (SerializableTimestampAssigner<Long>)
+                                                (element, recordTimestamp) -> 
element));
+    }
+
+    @Test
+    public void testGlobalWindows() throws Exception {
+        DataStream<List<Long>> outputStream =
+                DataStreamUtils.windowAllProcess(
+                        inputStream,
+                        GlobalWindows.getInstance(),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertEquals(1, actualBatches.size());
+        assertEquals(new HashSet<>(inputData), new 
HashSet<>(actualBatches.get(0)));
+    }
+
+    @Test
+    public void testCountTumblingWindows() throws Exception {
+        DataStream<List<Long>> outputStream =
+                DataStreamUtils.windowAllProcess(
+                        inputStream,
+                        CountTumblingWindows.of(RECORD_NUM / 7),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertEquals(7, actualBatches.size());
+        int count = 0;
+        for (List<Long> batch : actualBatches) {
+            count += batch.size();
+        }
+        assertEquals(RECORD_NUM - (RECORD_NUM % 7), count);
+    }
+
+    @Test
+    public void testProcessingTimeTumblingWindows() throws Exception {
+        DataStream<List<Long>> outputStream =
+                DataStreamUtils.windowAllProcess(
+                        inputStreamWithProcessingTimeGap,
+                        
ProcessingTimeTumblingWindows.of(Time.milliseconds(100)),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertTrue(actualBatches.size() > 1);
+        List<Long> mergedBatches = new ArrayList<>();
+        for (List<Long> batch : actualBatches) {
+            mergedBatches.addAll(batch);
+        }
+        assertTrue(mergedBatches.containsAll(inputData.subList(0, RECORD_NUM - 
1)));
+    }
+
+    @Test
+    public void testEventTimeTumblingWindows() throws Exception {
+        DataStream<List<Long>> outputStream =
+                DataStreamUtils.windowAllProcess(
+                        inputStreamWithEventTime,
+                        
EventTimeTumblingWindows.of(Time.milliseconds(RECORD_NUM / 7)),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertEquals(8, actualBatches.size());
+        List<Long> mergedBatches = new ArrayList<>();
+        for (List<Long> batch : actualBatches) {
+            mergedBatches.addAll(batch);
+        }
+        assertEquals(RECORD_NUM, mergedBatches.size());
+        assertEquals(new HashSet<>(inputData), new HashSet<>(mergedBatches));
+    }
+
+    @Test
+    public void testProcessingTimeSessionWindows() throws Exception {
+        DataStream<List<Long>> outputStream =
+                DataStreamUtils.windowAllProcess(
+                        inputStreamWithProcessingTimeGap,
+                        
ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertTrue(actualBatches.size() > 1);
+        List<Long> mergedBatches = new ArrayList<>();
+        for (List<Long> batch : actualBatches) {
+            mergedBatches.addAll(batch);
+        }
+        assertTrue(mergedBatches.containsAll(inputData.subList(0, RECORD_NUM - 
1)));
+    }
+
+    @Test
+    public void testEventTimeSessionWindows() throws Exception {
+        DataStream<List<Long>> outputStream =
+                DataStreamUtils.windowAllProcess(
+                        inputStreamWithEventTime,
+                        
EventTimeSessionWindows.withGap(Time.milliseconds(RECORD_NUM / 7)),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertEquals(1, actualBatches.size());
+        assertEquals(new HashSet<>(inputData), new 
HashSet<>(actualBatches.get(0)));
+    }
+
+    @Test
+    public void testSaveLoadWindowsParams() throws Exception {

Review Comment:
   It might be more readable to move this test into `StageTest.java` for 
consistency with `test_stage.py`, and keep the practice that all tests 
regarding the parameters' save/load can be found in StageTest.java.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
         }
     }
 
+    /**
+     * Applies windowAll() and process() operation on the input stream.
+     *
+     * @param input The input data stream.
+     * @param windows The window that defines how input data would be sliced 
into batches.
+     * @param function The user defined process function.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> 
windowAllProcess(
+            DataStream<IN> input, Windows windows, 
ProcessAllWindowFunction<IN, OUT, W> function) {
+        AllWindowedStream<IN, W> allWindowedStream;
+        if (windows instanceof GlobalWindows) {
+            allWindowedStream = input.windowAll((WindowAssigner) 
EndOfStreamWindows.get());
+        } else if (windows instanceof CountTumblingWindows) {
+            long countWindowSize = ((CountTumblingWindows) windows).getSize();
+            allWindowedStream = (AllWindowedStream<IN, W>) 
input.countWindowAll(countWindowSize);
+        } else {
+            allWindowedStream =
+                    input.windowAll((WindowAssigner) 
getDataStreamTimeWindowAssigner(windows));
+        }
+        return allWindowedStream.process(function);
+    }
+
+    private static WindowAssigner<Object, TimeWindow> 
getDataStreamTimeWindowAssigner(
+            Windows windows) {
+        if (windows instanceof EventTimeTumblingWindows) {
+            return TumblingEventTimeWindows.of(
+                    getStreamWindowTime(((EventTimeTumblingWindows) 
windows).getSize()));
+        } else if (windows instanceof ProcessingTimeTumblingWindows) {
+            return TumblingProcessingTimeWindows.of(
+                    getStreamWindowTime(((ProcessingTimeTumblingWindows) 
windows).getSize()));
+        } else if (windows instanceof EventTimeSessionWindows) {
+            return 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+                    .withGap(getStreamWindowTime(((EventTimeSessionWindows) 
windows).getGap()));
+        } else if (windows instanceof ProcessingTimeSessionWindows) {
+            return 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
+                    .withGap(
+                            
getStreamWindowTime(((ProcessingTimeSessionWindows) windows).getGap()));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported %s subclass: %s",
+                            Windows.class.getSimpleName(), 
windows.getClass().getName()));

Review Comment:
   Would it be simpler to hardcode the window class name instead of calling 
`Windows.class.getSimpleName()`?



##########
docs/content/docs/operators/clustering/agglomerativeclustering.md:
##########
@@ -49,15 +49,16 @@ format of the merging information is
 
 ### Parameters
 
-| Key               | Default        | Type    | Required | Description        
                                                                                
                 |
-|:------------------|:---------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------|
-| numClusters       | `2`            | Integer | no       | The max number of 
clusters to create.                                                             
                  |
-| distanceThreshold | `null`         | Double  | no       | Threshold to 
decide whether two clusters should be merged.                                   
                       |
-| linkage           | `"ward"`       | String  | no       | Criterion for 
computing distance between two clusters. Supported values: `'ward', 'complete', 
'single', 'average'`. |
-| computeFullTree   | `false`        | Boolean | no       | Whether computes 
the full tree after convergence.                                                
                   |
-| distanceMeasure   | `"euclidean"`  | String  | no       | Distance measure. 
Supported values: `'euclidean', 'manhattan', 'cosine'`.                         
                  |
-| featuresCol       | `"features"`   | String  | no       | Features column 
name.                                                                           
                    |
-| predictionCol     | `"prediction"` | String  | no       | Prediction column 
name.                                                                           
                  |
+| Key               | Default               | Type    | Required | Description 
                                                                                
                                                                  |
+|:------------------|:----------------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| numClusters       | `2`                   | Integer | no       | The max 
number of clusters to create.                                                   
                                                                      |
+| distanceThreshold | `null`                | Double  | no       | Threshold 
to decide whether two clusters should be merged.                                
                                                                    |
+| linkage           | `"ward"`              | String  | no       | Criterion 
for computing distance between two clusters.                                    
                                                                    |
+| computeFullTree   | `false`               | Boolean | no       | Whether 
computes the full tree after convergence.                                       
                                                                      |
+| distanceMeasure   | `"euclidean"`         | String  | no       | Distance 
measure.                                                                        
                                                                     |
+| featuresCol       | `"features"`          | String  | no       | Features 
column name.                                                                    
                                                                     |
+| predictionCol     | `"prediction"`        | String  | no       | Prediction 
column name.                                                                    
                                                                   |
+| windows           | `GlobalWindows.get()` | Windows | no       | Windowing 
strategy that determines how to create mini-batches from input data. By 
default, all data in the input stream would be placed into the same window. |

Review Comment:
   GlobalWindows.get() -> GlobalWindows.getInstance()



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java:
##########
@@ -203,6 +229,59 @@ public void testTransform() throws Exception {
                 agglomerativeClustering.getPredictionCol());
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testTransformWithEventTimeWindow() throws Exception {
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation<?>[] 
{DenseVectorTypeInfo.INSTANCE, Types.INSTANT},
+                        new String[] {"features", "ts"});
+
+        Instant baseTime = Instant.now();
+        DataStream<Row> inputDataStream =
+                env.fromCollection(INPUT_DATA)
+                        .setParallelism(1)
+                        .map(x -> Row.of(x, baseTime.plusSeconds((long) 
x.get(0))), outputTypeInfo);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column("features", 
DataTypes.of(DenseVectorTypeInfo.INSTANCE))
+                        .column("ts", DataTypes.TIMESTAMP_LTZ(3))
+                        .watermark("ts", "ts - INTERVAL '5' SECOND")
+                        .build();
+
+        Table inputDataTable = tEnv.fromDataStream(inputDataStream, schema);
+
+        AgglomerativeClustering agglomerativeClustering =
+                new AgglomerativeClustering()
+                        
.setLinkage(AgglomerativeClusteringParams.LINKAGE_AVERAGE)
+                        .setDistanceMeasure(EuclideanDistanceMeasure.NAME)
+                        .setPredictionCol("pred")
+                        
.setWindows(EventTimeTumblingWindows.of(Time.seconds(1)));

Review Comment:
   Can we keep the python test and java test consistent? Maybe add a test with 
CountTumblingWindow in Java and add a test with EventTimeTumblingWindows in 
Python?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -70,6 +68,12 @@
  * second one contains the information of merging two clusters at each step. 
The data format of the
  * merging information is (clusterId1, clusterId2, distance, 
sizeOfMergedCluster).
  *
+ * <p>This operator supports the {@link 
org.apache.flink.ml.common.param.HasWindows} parameter,
+ * which creates mini-batches(windows) from input data and feeds each batch 
into this operator. The
+ * operator would perform the clustering operation on each mini-batch 
independently and output their

Review Comment:
   Since the windowing strategy is used inside this operator rather than before 
this operator, it seems confusing to say `creates mini-batch ... and feeds each 
batch into this operator`.
   
   How about the following doc:
   
   This AlgoOperator splits input stream into mini-batches of elements 
according to the windowing strategy specified by the {@link 
org.apache.flink.ml.common.param.HasWindows} parameter, and performs the 
hierarchical clustering on each mini-batch independently. The clustering result 
of each element depends only on the elements in the same mini-batch.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
         }
     }
 
+    /**
+     * Applies windowAll() and process() operation on the input stream.
+     *
+     * @param input The input data stream.
+     * @param windows The window that defines how input data would be sliced 
into batches.
+     * @param function The user defined process function.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> 
windowAllProcess(

Review Comment:
   How about renaming this function as `windowAllAndProcess(...)`?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/EventTimeSessionWindows.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that windows elements into sessions based on the 
timestamp of the elements.

Review Comment:
   How about this doc:
   
   A windowing strategy that groups elements into sessions based on the 
timestamp of the elements. Windows do not overlap.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.common.window.GlobalWindows;
+import org.apache.flink.ml.common.window.Windows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared windows param. */
+public interface HasWindows<T> extends WithParams<T> {
+    Param<Windows> WINDOWS =
+            new WindowsParam(
+                    "windows",
+                    "Windowing strategy that determines how to create 
mini-batches from input data. "
+                            + "By default, all data in the input stream would 
be placed into the same window.",
+                    GlobalWindows.getInstance(),
+                    ParamValidators.notNull());
+
+    default T setWindows(Windows windows) {
+        return set(WINDOWS, windows);
+    }
+
+    default Windows getWindows() {

Review Comment:
   Put `getWindows()` before `setWindows()` for consistency with other 
parameters?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/EventTimeTumblingWindows.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that groups elements into fixed-size windows based on the 
timestamp of the

Review Comment:
   How about this doc:
   
   A windowing strategy that groups elements into fixed-size windows based on 
the timestamp of the elements. Windows do not overlap.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/GlobalWindows.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+/**
+ * A {@link Windows} that assigns all the elements into a single global 
window. In order for this

Review Comment:
   Would the following doc be more consistent with docs of other windows 
subclasses?
   
   A windowing strategy that groups all elements into a single global window. 
This strategy assumes that the input strategy is bounded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to