lindong28 commented on code in PR #157:
URL: https://github.com/apache/flink-ml/pull/157#discussion_r977119140


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.streaming.api.datastream.AllWindowedStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Utility class for operations related to the window. */

Review Comment:
   nits: Utility class for operations related to {@link Windows}



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.streaming.api.datastream.AllWindowedStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Utility class for operations related to the window. */
+@Internal
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class WindowUtils {
+    /**
+     * Applies windowAll() and process() operation on the input stream.
+     *
+     * @param input The input data stream.
+     * @param windows The window that defines how input data would be sliced 
into batches.
+     * @param function The user defined process function.
+     */
+    public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> 
windowAllProcess(
+            DataStream<IN> input, Windows windows, 
ProcessAllWindowFunction<IN, OUT, W> function) {
+        AllWindowedStream<IN, W> allWindowedStream;
+        if (windows == null) {
+            allWindowedStream = input.windowAll((WindowAssigner) 
EndOfStreamWindows.get());
+        } else if (windows instanceof CountTumblingWindows) {
+            long countWindowSize = ((CountTumblingWindows) windows).size;
+            allWindowedStream = (AllWindowedStream<IN, W>) 
input.countWindowAll(countWindowSize);
+        } else {
+            allWindowedStream =
+                    input.windowAll(
+                            (WindowAssigner) 
WindowUtils.getDataStreamTimeWindowAssigner(windows));
+        }
+        return allWindowedStream.process(function);
+    }
+
+    private static WindowAssigner<Object, TimeWindow> 
getDataStreamTimeWindowAssigner(
+            Windows windows) {
+        if (windows instanceof EventTimeTumblingWindows) {
+            return TumblingEventTimeWindows.of(
+                    getStreamWindowTime(((EventTimeTumblingWindows) 
windows).size));
+        } else if (windows instanceof ProcessingTimeTumblingWindows) {
+            return TumblingProcessingTimeWindows.of(
+                    getStreamWindowTime(((ProcessingTimeTumblingWindows) 
windows).size));
+        } else if (windows instanceof EventTimeSessionWindows) {
+            return 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+                    .withGap(getStreamWindowTime(((EventTimeSessionWindows) 
windows).gap));
+        } else if (windows instanceof ProcessingTimeSessionWindows) {
+            return 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
+                    
.withGap(getStreamWindowTime(((ProcessingTimeSessionWindows) windows).gap));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported %s subclass: %s",
+                            Windows.class.getSimpleName(), 
windows.getClass().getName()));
+        }
+    }
+
+    private static org.apache.flink.streaming.api.windowing.time.Time 
getStreamWindowTime(
+            Time time) {
+        return org.apache.flink.streaming.api.windowing.time.Time.of(
+                time.getSize(), time.getUnit());
+    }
+
+    public static Object jsonEncode(Windows value) {
+        Map<String, Object> map = new HashMap<>();
+        if (value == null) {
+            return map;
+        }
+
+        map.put("class", value.getClass().getName());
+        if (value instanceof CountTumblingWindows) {
+            map.put("size", ((CountTumblingWindows) value).size);
+        } else if (value instanceof ProcessingTimeTumblingWindows) {
+            map.putAll(encodeTime(((ProcessingTimeTumblingWindows) 
value).size, "size"));
+        } else if (value instanceof EventTimeTumblingWindows) {
+            map.putAll(encodeTime(((EventTimeTumblingWindows) value).size, 
"size"));
+        } else if (value instanceof ProcessingTimeSessionWindows) {
+            map.putAll(encodeTime(((ProcessingTimeSessionWindows) value).gap, 
"gap"));
+        } else if (value instanceof EventTimeSessionWindows) {
+            map.putAll(encodeTime(((EventTimeSessionWindows) value).gap, 
"gap"));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported %s subclass: %s", 
Windows.class, value.getClass()));
+        }
+        return map;
+    }
+
+    public static Windows jsonDecode(Object json) {
+        Map<String, Object> map = (Map<String, Object>) json;
+        if (map.isEmpty()) {
+            return null;
+        }
+
+        String classString = (String) map.get("class");
+        if (classString.equals(CountTumblingWindows.class.getName())) {
+            long size = ((Number) map.get("size")).longValue();
+            return CountTumblingWindows.over(size);
+        } else if 
(classString.equals(ProcessingTimeTumblingWindows.class.getName())) {
+            Time size = decodeTime(map, "size");
+            return ProcessingTimeTumblingWindows.over(size);
+        } else if 
(classString.equals(EventTimeTumblingWindows.class.getName())) {
+            Time size = decodeTime(map, "size");
+            return EventTimeTumblingWindows.over(size);
+        } else if 
(classString.equals(ProcessingTimeSessionWindows.class.getName())) {
+            Time gap = decodeTime(map, "gap");
+            return ProcessingTimeSessionWindows.withGap(gap);
+        } else if 
(classString.equals(EventTimeSessionWindows.class.getName())) {
+            Time gap = decodeTime(map, "gap");
+            return EventTimeSessionWindows.withGap(gap);
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported %s subclass: %s", 
Windows.class, classString));
+        }
+    }
+
+    private static Map<String, Object> encodeTime(Time time, String keyPrefix) 
{

Review Comment:
   Would it be simpler to just encode time as a long value representing 
duration in ms?



##########
docs/content/docs/operators/clustering/agglomerativeclustering.md:
##########
@@ -49,15 +49,16 @@ format of the merging information is
 
 ### Parameters
 
-| Key               | Default        | Type    | Required | Description        
                                                                                
                 |
-|:------------------|:---------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------|
-| numClusters       | `2`            | Integer | no       | The max number of 
clusters to create.                                                             
                  |
-| distanceThreshold | `null`         | Double  | no       | Threshold to 
decide whether two clusters should be merged.                                   
                       |
-| linkage           | `"ward"`       | String  | no       | Criterion for 
computing distance between two clusters. Supported values: `'ward', 'complete', 
'single', 'average'`. |
-| computeFullTree   | `false`        | Boolean | no       | Whether computes 
the full tree after convergence.                                                
                   |
-| distanceMeasure   | `"euclidean"`  | String  | no       | Distance measure. 
Supported values: `'euclidean', 'manhattan', 'cosine'`.                         
                  |
-| featuresCol       | `"features"`   | String  | no       | Features column 
name.                                                                           
                    |
-| predictionCol     | `"prediction"` | String  | no       | Prediction column 
name.                                                                           
                  |
+| Key               | Default               | Type    | Required | Description 
                                                 |
+| :---------------- | :-------------------- | :------ | :------- | 
:----------------------------------------------------------- |
+| numClusters       | `2`                   | Integer | no       | The max 
number of clusters to create.                        |
+| distanceThreshold | `null`                | Double  | no       | Threshold 
to decide whether two clusters should be merged.   |
+| linkage           | `"ward"`              | String  | no       | Criterion 
for computing distance between two clusters. Supported values: `'ward', 
'complete', 'single', 'average'`. |
+| computeFullTree   | `false`               | Boolean | no       | Whether 
computes the full tree after convergence.            |
+| distanceMeasure   | `"euclidean"`         | String  | no       | Distance 
measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. |
+| window            | `BoundedWindow.get()` | Window  | no       | How 
elements would be sliced into batches and fed into the Stage. |

Review Comment:
   Maybe update the doc as appropriate?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.streaming.api.datastream.AllWindowedStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Utility class for operations related to the window. */
+@Internal
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class WindowUtils {

Review Comment:
   nits: would it be better to use `WindowsUtils`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java:
##########
@@ -383,6 +398,7 @@ public void testStageSaveLoad() throws IOException {
         stage.set(MyParams.DOUBLE_ARRAY_PARAM, new Double[] {50.0, 51.0});
         stage.set(MyParams.STRING_ARRAY_PARAM, new String[] {"50", "51"});
         stage.set(MyParams.VECTOR_PARAM, Vectors.dense(2, 3, 4));
+        stage.set(MyParams.WINDOWS_PARAM, 
EventTimeSessionWindows.withGap(Time.milliseconds(100)));

Review Comment:
   Would it be useful to also validate that other Windows subclasses can be 
serialized and de-serialized correctly?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.common.window.Windows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared windows param. */
+public interface HasWindows<T> extends WithParams<T> {
+    Param<Windows> WINDOWS =
+            new WindowsParam(
+                    "windows",
+                    "Windows parameter that determines how input stream would 
be sliced into batches. "

Review Comment:
   How about the following doc:
   
   Windowing strategy that determines how to create mini-batches from input 
data. If the value is null, the input stream must be bounded and all data in 
the input stream would be placed into the same window.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/Windows.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+/**
+ * A {@link Windows} determines how input data stream would be sliced into 
batches and fed into a

Review Comment:
   Note that a Flink ML stage takes lists of DataStream as input, which will 
not be affected by `windows`. So it might be confusing to say that windows 
affect the input of a Flink stage.
   
   How about the following doc:
   
   Windowing strategy that determines how to create mini-batches from input 
data.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/CountTumblingWindows.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that groups elements into windows of fixed number of 
elements. Windows do not
+ * overlap.
+ */
+public class CountTumblingWindows implements Windows {
+    /** Size of this window as row-count interval. */
+    final long size;
+
+    private CountTumblingWindows(long size) {
+        Preconditions.checkArgument(
+                size > 0, "The size of a count window must be a positive 
value");
+        this.size = size;
+    }
+
+    /**
+     * Creates a new {@link CountTumblingWindows}.
+     *
+     * @param size the size of the window as row-count interval.
+     */
+    public static CountTumblingWindows over(long size) {

Review Comment:
   Would it be better to use `of(...)` for consistency with Flink and Beam's 
APIs?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -35,18 +36,15 @@
 import org.apache.flink.ml.param.Param;
 import org.apache.flink.ml.util.ParamUtils;
 import org.apache.flink.ml.util.ReadWriteUtils;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;

Review Comment:
   Can you update the Java doc of this algorithm to cover its behavior w.r.t. 
windows?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link Windows}s and {@link WindowUtils}. */
+@SuppressWarnings("unchecked")
+public class WindowsTest extends AbstractTestBase {
+    private static final int RECORD_NUM = 100;
+
+    private static List<Long> inputData;
+
+    private static DataStream<Long> inputStream;
+    private static DataStream<Long> inputStreamWithInterval;
+    private static DataStream<Long> inputStreamWithTimestamp;
+
+    @BeforeClass
+    public static void before() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        inputData = new ArrayList<>();
+        for (long i = 0; i < RECORD_NUM; i++) {
+            inputData.add(i);
+        }
+        inputStream = env.fromCollection(inputData);
+
+        inputStreamWithInterval =
+                inputStream
+                        .map(
+                                new MapFunction<Long, Long>() {
+                                    private int count = 0;
+
+                                    @Override
+                                    public Long map(Long value) throws 
Exception {
+                                        count++;
+                                        if (count % (RECORD_NUM / 2) == 0) {
+                                            Thread.sleep(1000);
+                                        }
+                                        return value;
+                                    }
+                                })
+                        .setParallelism(1);
+
+        inputStreamWithTimestamp =
+                inputStream.assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Long>forMonotonousTimestamps()
+                                .withTimestampAssigner(
+                                        (SerializableTimestampAssigner<Long>)
+                                                (element, recordTimestamp) -> 
element));
+    }
+
+    @Test
+    public void testBoundedWindow() throws Exception {

Review Comment:
   It might be more intuitive to name it `testGlobalWindows()`.



##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link Windows}s and {@link WindowUtils}. */
+@SuppressWarnings("unchecked")
+public class WindowsTest extends AbstractTestBase {
+    private static final int RECORD_NUM = 100;
+
+    private static List<Long> inputData;
+
+    private static DataStream<Long> inputStream;
+    private static DataStream<Long> inputStreamWithInterval;
+    private static DataStream<Long> inputStreamWithTimestamp;
+
+    @BeforeClass
+    public static void before() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        inputData = new ArrayList<>();
+        for (long i = 0; i < RECORD_NUM; i++) {
+            inputData.add(i);
+        }
+        inputStream = env.fromCollection(inputData);
+
+        inputStreamWithInterval =
+                inputStream
+                        .map(
+                                new MapFunction<Long, Long>() {
+                                    private int count = 0;
+
+                                    @Override
+                                    public Long map(Long value) throws 
Exception {
+                                        count++;
+                                        if (count % (RECORD_NUM / 2) == 0) {
+                                            Thread.sleep(1000);
+                                        }
+                                        return value;
+                                    }
+                                })
+                        .setParallelism(1);
+
+        inputStreamWithTimestamp =
+                inputStream.assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Long>forMonotonousTimestamps()
+                                .withTimestampAssigner(
+                                        (SerializableTimestampAssigner<Long>)
+                                                (element, recordTimestamp) -> 
element));
+    }
+
+    @Test
+    public void testBoundedWindow() throws Exception {
+        DataStream<List<Long>> outputStream =
+                WindowUtils.windowAllProcess(
+                        inputStream, null, new 
CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertEquals(1, actualBatches.size());
+        assertEquals(new HashSet<>(inputData), new 
HashSet<>(actualBatches.get(0)));
+    }
+
+    @Test
+    public void testCountWindow() throws Exception {

Review Comment:
   Would it be better to name the test based on the windows subclass name?
   
   Same for other tests.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/CountTumblingWindows.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that groups elements into windows of fixed number of 
elements. Windows do not
+ * overlap.
+ */
+public class CountTumblingWindows implements Windows {
+    /** Size of this window as row-count interval. */
+    final long size;

Review Comment:
   Would it be better to make this variables `private` and add a public 
`getSize()` method, so that developers can implement custom algorithms with 
Windows parameters without having to use `WindowUtils`?
   
   Same for other `Windows` subclasses.
   
   Note that this PR has exposed Window class as public API by allowing 
algorithm users to instantiate Windows instances. It would seem a bit weird if 
they can not access its content.
   
   According to the discussion [1], though we believe this variable will not 
need to be mutable in the future, it is typically a safer choice to expose it 
via a getXXX() method if it can be accessed by many different classes.
   
   Also note that this is a bit different from DenseVector's `values` field 
because DenseVector will typically be accessed via public APIs of `Vector`.
   
   [1] 
https://stackoverflow.com/questions/6927763/immutable-type-public-final-fields-vs-getter
   
   
   



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.common.window.Windows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared windows param. */
+public interface HasWindows<T> extends WithParams<T> {
+    Param<Windows> WINDOWS =
+            new WindowsParam(
+                    "windows",
+                    "Windows parameter that determines how input stream would 
be sliced into batches. "
+                            + "By default all records in the bounded input 
stream would be placed into the same window.",
+                    null,

Review Comment:
   It might be more explicit to use `GlobalWindows` instead of null.



##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link Windows}s and {@link WindowUtils}. */
+@SuppressWarnings("unchecked")
+public class WindowsTest extends AbstractTestBase {
+    private static final int RECORD_NUM = 100;
+
+    private static List<Long> inputData;
+
+    private static DataStream<Long> inputStream;
+    private static DataStream<Long> inputStreamWithInterval;
+    private static DataStream<Long> inputStreamWithTimestamp;
+
+    @BeforeClass
+    public static void before() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        inputData = new ArrayList<>();
+        for (long i = 0; i < RECORD_NUM; i++) {
+            inputData.add(i);
+        }
+        inputStream = env.fromCollection(inputData);
+
+        inputStreamWithInterval =
+                inputStream
+                        .map(
+                                new MapFunction<Long, Long>() {
+                                    private int count = 0;
+
+                                    @Override
+                                    public Long map(Long value) throws 
Exception {
+                                        count++;
+                                        if (count % (RECORD_NUM / 2) == 0) {
+                                            Thread.sleep(1000);
+                                        }
+                                        return value;
+                                    }
+                                })
+                        .setParallelism(1);
+
+        inputStreamWithTimestamp =
+                inputStream.assignTimestampsAndWatermarks(
+                        WatermarkStrategy.<Long>forMonotonousTimestamps()
+                                .withTimestampAssigner(
+                                        (SerializableTimestampAssigner<Long>)
+                                                (element, recordTimestamp) -> 
element));
+    }
+
+    @Test
+    public void testBoundedWindow() throws Exception {
+        DataStream<List<Long>> outputStream =
+                WindowUtils.windowAllProcess(
+                        inputStream, null, new 
CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertEquals(1, actualBatches.size());
+        assertEquals(new HashSet<>(inputData), new 
HashSet<>(actualBatches.get(0)));
+    }
+
+    @Test
+    public void testCountWindow() throws Exception {
+        DataStream<List<Long>> outputStream =
+                WindowUtils.windowAllProcess(
+                        inputStream,
+                        CountTumblingWindows.over(RECORD_NUM / 3),
+                        new CreateAllWindowBatchFunction<>());
+        List<List<Long>> actualBatches = 
IteratorUtils.toList(outputStream.executeAndCollect());
+        assertTrue(actualBatches.size() >= 3 && actualBatches.size() <= 4);

Review Comment:
   Hmm... can we make the test result deterministic?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
+import org.apache.flink.streaming.api.datastream.AllWindowedStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/** Utility class for operations related to the window. */
+@Internal
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class WindowUtils {
+    /**
+     * Applies windowAll() and process() operation on the input stream.
+     *
+     * @param input The input data stream.
+     * @param windows The window that defines how input data would be sliced 
into batches.
+     * @param function The user defined process function.
+     */
+    public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> 
windowAllProcess(
+            DataStream<IN> input, Windows windows, 
ProcessAllWindowFunction<IN, OUT, W> function) {
+        AllWindowedStream<IN, W> allWindowedStream;
+        if (windows == null) {
+            allWindowedStream = input.windowAll((WindowAssigner) 
EndOfStreamWindows.get());
+        } else if (windows instanceof CountTumblingWindows) {
+            long countWindowSize = ((CountTumblingWindows) windows).size;
+            allWindowedStream = (AllWindowedStream<IN, W>) 
input.countWindowAll(countWindowSize);
+        } else {
+            allWindowedStream =
+                    input.windowAll(
+                            (WindowAssigner) 
WindowUtils.getDataStreamTimeWindowAssigner(windows));
+        }
+        return allWindowedStream.process(function);
+    }
+
+    private static WindowAssigner<Object, TimeWindow> 
getDataStreamTimeWindowAssigner(
+            Windows windows) {
+        if (windows instanceof EventTimeTumblingWindows) {
+            return TumblingEventTimeWindows.of(
+                    getStreamWindowTime(((EventTimeTumblingWindows) 
windows).size));
+        } else if (windows instanceof ProcessingTimeTumblingWindows) {
+            return TumblingProcessingTimeWindows.of(
+                    getStreamWindowTime(((ProcessingTimeTumblingWindows) 
windows).size));
+        } else if (windows instanceof EventTimeSessionWindows) {
+            return 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+                    .withGap(getStreamWindowTime(((EventTimeSessionWindows) 
windows).gap));
+        } else if (windows instanceof ProcessingTimeSessionWindows) {
+            return 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
+                    
.withGap(getStreamWindowTime(((ProcessingTimeSessionWindows) windows).gap));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported %s subclass: %s",
+                            Windows.class.getSimpleName(), 
windows.getClass().getName()));
+        }
+    }
+
+    private static org.apache.flink.streaming.api.windowing.time.Time 
getStreamWindowTime(
+            Time time) {
+        return org.apache.flink.streaming.api.windowing.time.Time.of(
+                time.getSize(), time.getUnit());
+    }
+
+    public static Object jsonEncode(Windows value) {
+        Map<String, Object> map = new HashMap<>();
+        if (value == null) {
+            return map;
+        }
+
+        map.put("class", value.getClass().getName());
+        if (value instanceof CountTumblingWindows) {
+            map.put("size", ((CountTumblingWindows) value).size);
+        } else if (value instanceof ProcessingTimeTumblingWindows) {
+            map.putAll(encodeTime(((ProcessingTimeTumblingWindows) 
value).size, "size"));
+        } else if (value instanceof EventTimeTumblingWindows) {
+            map.putAll(encodeTime(((EventTimeTumblingWindows) value).size, 
"size"));
+        } else if (value instanceof ProcessingTimeSessionWindows) {
+            map.putAll(encodeTime(((ProcessingTimeSessionWindows) value).gap, 
"gap"));
+        } else if (value instanceof EventTimeSessionWindows) {
+            map.putAll(encodeTime(((EventTimeSessionWindows) value).gap, 
"gap"));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported %s subclass: %s", 
Windows.class, value.getClass()));
+        }
+        return map;
+    }
+
+    public static Windows jsonDecode(Object json) {

Review Comment:
   Since this method is only used by `WindowsParam`, would it be simpler to 
move it there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to