lindong28 commented on code in PR #157: URL: https://github.com/apache/flink-ml/pull/157#discussion_r977119140
########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** Utility class for operations related to the window. */ Review Comment: nits: Utility class for operations related to {@link Windows} ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** Utility class for operations related to the window. */ +@Internal +@SuppressWarnings({"rawtypes", "unchecked"}) +public class WindowUtils { + /** + * Applies windowAll() and process() operation on the input stream. + * + * @param input The input data stream. + * @param windows The window that defines how input data would be sliced into batches. + * @param function The user defined process function. + */ + public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> windowAllProcess( + DataStream<IN> input, Windows windows, ProcessAllWindowFunction<IN, OUT, W> function) { + AllWindowedStream<IN, W> allWindowedStream; + if (windows == null) { + allWindowedStream = input.windowAll((WindowAssigner) EndOfStreamWindows.get()); + } else if (windows instanceof CountTumblingWindows) { + long countWindowSize = ((CountTumblingWindows) windows).size; + allWindowedStream = (AllWindowedStream<IN, W>) input.countWindowAll(countWindowSize); + } else { + allWindowedStream = + input.windowAll( + (WindowAssigner) WindowUtils.getDataStreamTimeWindowAssigner(windows)); + } + return allWindowedStream.process(function); + } + + private static WindowAssigner<Object, TimeWindow> getDataStreamTimeWindowAssigner( + Windows windows) { + if (windows instanceof EventTimeTumblingWindows) { + return TumblingEventTimeWindows.of( + getStreamWindowTime(((EventTimeTumblingWindows) windows).size)); + } else if (windows instanceof ProcessingTimeTumblingWindows) { + return TumblingProcessingTimeWindows.of( + getStreamWindowTime(((ProcessingTimeTumblingWindows) windows).size)); + } else if (windows instanceof EventTimeSessionWindows) { + return org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows + .withGap(getStreamWindowTime(((EventTimeSessionWindows) windows).gap)); + } else if (windows instanceof ProcessingTimeSessionWindows) { + return org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows + .withGap(getStreamWindowTime(((ProcessingTimeSessionWindows) windows).gap)); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported %s subclass: %s", + Windows.class.getSimpleName(), windows.getClass().getName())); + } + } + + private static org.apache.flink.streaming.api.windowing.time.Time getStreamWindowTime( + Time time) { + return org.apache.flink.streaming.api.windowing.time.Time.of( + time.getSize(), time.getUnit()); + } + + public static Object jsonEncode(Windows value) { + Map<String, Object> map = new HashMap<>(); + if (value == null) { + return map; + } + + map.put("class", value.getClass().getName()); + if (value instanceof CountTumblingWindows) { + map.put("size", ((CountTumblingWindows) value).size); + } else if (value instanceof ProcessingTimeTumblingWindows) { + map.putAll(encodeTime(((ProcessingTimeTumblingWindows) value).size, "size")); + } else if (value instanceof EventTimeTumblingWindows) { + map.putAll(encodeTime(((EventTimeTumblingWindows) value).size, "size")); + } else if (value instanceof ProcessingTimeSessionWindows) { + map.putAll(encodeTime(((ProcessingTimeSessionWindows) value).gap, "gap")); + } else if (value instanceof EventTimeSessionWindows) { + map.putAll(encodeTime(((EventTimeSessionWindows) value).gap, "gap")); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported %s subclass: %s", Windows.class, value.getClass())); + } + return map; + } + + public static Windows jsonDecode(Object json) { + Map<String, Object> map = (Map<String, Object>) json; + if (map.isEmpty()) { + return null; + } + + String classString = (String) map.get("class"); + if (classString.equals(CountTumblingWindows.class.getName())) { + long size = ((Number) map.get("size")).longValue(); + return CountTumblingWindows.over(size); + } else if (classString.equals(ProcessingTimeTumblingWindows.class.getName())) { + Time size = decodeTime(map, "size"); + return ProcessingTimeTumblingWindows.over(size); + } else if (classString.equals(EventTimeTumblingWindows.class.getName())) { + Time size = decodeTime(map, "size"); + return EventTimeTumblingWindows.over(size); + } else if (classString.equals(ProcessingTimeSessionWindows.class.getName())) { + Time gap = decodeTime(map, "gap"); + return ProcessingTimeSessionWindows.withGap(gap); + } else if (classString.equals(EventTimeSessionWindows.class.getName())) { + Time gap = decodeTime(map, "gap"); + return EventTimeSessionWindows.withGap(gap); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported %s subclass: %s", Windows.class, classString)); + } + } + + private static Map<String, Object> encodeTime(Time time, String keyPrefix) { Review Comment: Would it be simpler to just encode time as a long value representing duration in ms? ########## docs/content/docs/operators/clustering/agglomerativeclustering.md: ########## @@ -49,15 +49,16 @@ format of the merging information is ### Parameters -| Key | Default | Type | Required | Description | -|:------------------|:---------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------| -| numClusters | `2` | Integer | no | The max number of clusters to create. | -| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. | -| linkage | `"ward"` | String | no | Criterion for computing distance between two clusters. Supported values: `'ward', 'complete', 'single', 'average'`. | -| computeFullTree | `false` | Boolean | no | Whether computes the full tree after convergence. | -| distanceMeasure | `"euclidean"` | String | no | Distance measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. | -| featuresCol | `"features"` | String | no | Features column name. | -| predictionCol | `"prediction"` | String | no | Prediction column name. | +| Key | Default | Type | Required | Description | +| :---------------- | :-------------------- | :------ | :------- | :----------------------------------------------------------- | +| numClusters | `2` | Integer | no | The max number of clusters to create. | +| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. | +| linkage | `"ward"` | String | no | Criterion for computing distance between two clusters. Supported values: `'ward', 'complete', 'single', 'average'`. | +| computeFullTree | `false` | Boolean | no | Whether computes the full tree after convergence. | +| distanceMeasure | `"euclidean"` | String | no | Distance measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. | +| window | `BoundedWindow.get()` | Window | no | How elements would be sliced into batches and fed into the Stage. | Review Comment: Maybe update the doc as appropriate? ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** Utility class for operations related to the window. */ +@Internal +@SuppressWarnings({"rawtypes", "unchecked"}) +public class WindowUtils { Review Comment: nits: would it be better to use `WindowsUtils`? ########## flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java: ########## @@ -383,6 +398,7 @@ public void testStageSaveLoad() throws IOException { stage.set(MyParams.DOUBLE_ARRAY_PARAM, new Double[] {50.0, 51.0}); stage.set(MyParams.STRING_ARRAY_PARAM, new String[] {"50", "51"}); stage.set(MyParams.VECTOR_PARAM, Vectors.dense(2, 3, 4)); + stage.set(MyParams.WINDOWS_PARAM, EventTimeSessionWindows.withGap(Time.milliseconds(100))); Review Comment: Would it be useful to also validate that other Windows subclasses can be serialized and de-serialized correctly? ########## flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.common.window.Windows; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WindowsParam; +import org.apache.flink.ml.param.WithParams; + +/** Interface for the shared windows param. */ +public interface HasWindows<T> extends WithParams<T> { + Param<Windows> WINDOWS = + new WindowsParam( + "windows", + "Windows parameter that determines how input stream would be sliced into batches. " Review Comment: How about the following doc: Windowing strategy that determines how to create mini-batches from input data. If the value is null, the input stream must be bounded and all data in the input stream would be placed into the same window. ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/Windows.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +/** + * A {@link Windows} determines how input data stream would be sliced into batches and fed into a Review Comment: Note that a Flink ML stage takes lists of DataStream as input, which will not be affected by `windows`. So it might be confusing to say that windows affect the input of a Flink stage. How about the following doc: Windowing strategy that determines how to create mini-batches from input data. ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/CountTumblingWindows.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.util.Preconditions; + +/** + * A {@link Windows} that groups elements into windows of fixed number of elements. Windows do not + * overlap. + */ +public class CountTumblingWindows implements Windows { + /** Size of this window as row-count interval. */ + final long size; + + private CountTumblingWindows(long size) { + Preconditions.checkArgument( + size > 0, "The size of a count window must be a positive value"); + this.size = size; + } + + /** + * Creates a new {@link CountTumblingWindows}. + * + * @param size the size of the window as row-count interval. + */ + public static CountTumblingWindows over(long size) { Review Comment: Would it be better to use `of(...)` for consistency with Flink and Beam's APIs? ########## flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java: ########## @@ -35,18 +36,15 @@ import org.apache.flink.ml.param.Param; import org.apache.flink.ml.util.ParamUtils; import org.apache.flink.ml.util.ReadWriteUtils; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.TableImpl; import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; Review Comment: Can you update the Java doc of this algorithm to cover its behavior w.r.t. windows? ########## flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests the {@link Windows}s and {@link WindowUtils}. */ +@SuppressWarnings("unchecked") +public class WindowsTest extends AbstractTestBase { + private static final int RECORD_NUM = 100; + + private static List<Long> inputData; + + private static DataStream<Long> inputStream; + private static DataStream<Long> inputStreamWithInterval; + private static DataStream<Long> inputStreamWithTimestamp; + + @BeforeClass + public static void before() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + inputData = new ArrayList<>(); + for (long i = 0; i < RECORD_NUM; i++) { + inputData.add(i); + } + inputStream = env.fromCollection(inputData); + + inputStreamWithInterval = + inputStream + .map( + new MapFunction<Long, Long>() { + private int count = 0; + + @Override + public Long map(Long value) throws Exception { + count++; + if (count % (RECORD_NUM / 2) == 0) { + Thread.sleep(1000); + } + return value; + } + }) + .setParallelism(1); + + inputStreamWithTimestamp = + inputStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Long>forMonotonousTimestamps() + .withTimestampAssigner( + (SerializableTimestampAssigner<Long>) + (element, recordTimestamp) -> element)); + } + + @Test + public void testBoundedWindow() throws Exception { Review Comment: It might be more intuitive to name it `testGlobalWindows()`. ########## flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests the {@link Windows}s and {@link WindowUtils}. */ +@SuppressWarnings("unchecked") +public class WindowsTest extends AbstractTestBase { + private static final int RECORD_NUM = 100; + + private static List<Long> inputData; + + private static DataStream<Long> inputStream; + private static DataStream<Long> inputStreamWithInterval; + private static DataStream<Long> inputStreamWithTimestamp; + + @BeforeClass + public static void before() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + inputData = new ArrayList<>(); + for (long i = 0; i < RECORD_NUM; i++) { + inputData.add(i); + } + inputStream = env.fromCollection(inputData); + + inputStreamWithInterval = + inputStream + .map( + new MapFunction<Long, Long>() { + private int count = 0; + + @Override + public Long map(Long value) throws Exception { + count++; + if (count % (RECORD_NUM / 2) == 0) { + Thread.sleep(1000); + } + return value; + } + }) + .setParallelism(1); + + inputStreamWithTimestamp = + inputStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Long>forMonotonousTimestamps() + .withTimestampAssigner( + (SerializableTimestampAssigner<Long>) + (element, recordTimestamp) -> element)); + } + + @Test + public void testBoundedWindow() throws Exception { + DataStream<List<Long>> outputStream = + WindowUtils.windowAllProcess( + inputStream, null, new CreateAllWindowBatchFunction<>()); + List<List<Long>> actualBatches = IteratorUtils.toList(outputStream.executeAndCollect()); + assertEquals(1, actualBatches.size()); + assertEquals(new HashSet<>(inputData), new HashSet<>(actualBatches.get(0))); + } + + @Test + public void testCountWindow() throws Exception { Review Comment: Would it be better to name the test based on the windows subclass name? Same for other tests. ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/CountTumblingWindows.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.util.Preconditions; + +/** + * A {@link Windows} that groups elements into windows of fixed number of elements. Windows do not + * overlap. + */ +public class CountTumblingWindows implements Windows { + /** Size of this window as row-count interval. */ + final long size; Review Comment: Would it be better to make this variables `private` and add a public `getSize()` method, so that developers can implement custom algorithms with Windows parameters without having to use `WindowUtils`? Same for other `Windows` subclasses. Note that this PR has exposed Window class as public API by allowing algorithm users to instantiate Windows instances. It would seem a bit weird if they can not access its content. According to the discussion [1], though we believe this variable will not need to be mutable in the future, it is typically a safer choice to expose it via a getXXX() method if it can be accessed by many different classes. Also note that this is a bit different from DenseVector's `values` field because DenseVector will typically be accessed via public APIs of `Vector`. [1] https://stackoverflow.com/questions/6927763/immutable-type-public-final-fields-vs-getter ########## flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.param; + +import org.apache.flink.ml.common.window.Windows; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.WindowsParam; +import org.apache.flink.ml.param.WithParams; + +/** Interface for the shared windows param. */ +public interface HasWindows<T> extends WithParams<T> { + Param<Windows> WINDOWS = + new WindowsParam( + "windows", + "Windows parameter that determines how input stream would be sliced into batches. " + + "By default all records in the bounded input stream would be placed into the same window.", + null, Review Comment: It might be more explicit to use `GlobalWindows` instead of null. ########## flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests the {@link Windows}s and {@link WindowUtils}. */ +@SuppressWarnings("unchecked") +public class WindowsTest extends AbstractTestBase { + private static final int RECORD_NUM = 100; + + private static List<Long> inputData; + + private static DataStream<Long> inputStream; + private static DataStream<Long> inputStreamWithInterval; + private static DataStream<Long> inputStreamWithTimestamp; + + @BeforeClass + public static void before() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + inputData = new ArrayList<>(); + for (long i = 0; i < RECORD_NUM; i++) { + inputData.add(i); + } + inputStream = env.fromCollection(inputData); + + inputStreamWithInterval = + inputStream + .map( + new MapFunction<Long, Long>() { + private int count = 0; + + @Override + public Long map(Long value) throws Exception { + count++; + if (count % (RECORD_NUM / 2) == 0) { + Thread.sleep(1000); + } + return value; + } + }) + .setParallelism(1); + + inputStreamWithTimestamp = + inputStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Long>forMonotonousTimestamps() + .withTimestampAssigner( + (SerializableTimestampAssigner<Long>) + (element, recordTimestamp) -> element)); + } + + @Test + public void testBoundedWindow() throws Exception { + DataStream<List<Long>> outputStream = + WindowUtils.windowAllProcess( + inputStream, null, new CreateAllWindowBatchFunction<>()); + List<List<Long>> actualBatches = IteratorUtils.toList(outputStream.executeAndCollect()); + assertEquals(1, actualBatches.size()); + assertEquals(new HashSet<>(inputData), new HashSet<>(actualBatches.get(0))); + } + + @Test + public void testCountWindow() throws Exception { + DataStream<List<Long>> outputStream = + WindowUtils.windowAllProcess( + inputStream, + CountTumblingWindows.over(RECORD_NUM / 3), + new CreateAllWindowBatchFunction<>()); + List<List<Long>> actualBatches = IteratorUtils.toList(outputStream.executeAndCollect()); + assertTrue(actualBatches.size() >= 3 && actualBatches.size() <= 4); Review Comment: Hmm... can we make the test result deterministic? ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/WindowUtils.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.ml.common.datastream.EndOfStreamWindows; +import org.apache.flink.streaming.api.datastream.AllWindowedStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** Utility class for operations related to the window. */ +@Internal +@SuppressWarnings({"rawtypes", "unchecked"}) +public class WindowUtils { + /** + * Applies windowAll() and process() operation on the input stream. + * + * @param input The input data stream. + * @param windows The window that defines how input data would be sliced into batches. + * @param function The user defined process function. + */ + public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT> windowAllProcess( + DataStream<IN> input, Windows windows, ProcessAllWindowFunction<IN, OUT, W> function) { + AllWindowedStream<IN, W> allWindowedStream; + if (windows == null) { + allWindowedStream = input.windowAll((WindowAssigner) EndOfStreamWindows.get()); + } else if (windows instanceof CountTumblingWindows) { + long countWindowSize = ((CountTumblingWindows) windows).size; + allWindowedStream = (AllWindowedStream<IN, W>) input.countWindowAll(countWindowSize); + } else { + allWindowedStream = + input.windowAll( + (WindowAssigner) WindowUtils.getDataStreamTimeWindowAssigner(windows)); + } + return allWindowedStream.process(function); + } + + private static WindowAssigner<Object, TimeWindow> getDataStreamTimeWindowAssigner( + Windows windows) { + if (windows instanceof EventTimeTumblingWindows) { + return TumblingEventTimeWindows.of( + getStreamWindowTime(((EventTimeTumblingWindows) windows).size)); + } else if (windows instanceof ProcessingTimeTumblingWindows) { + return TumblingProcessingTimeWindows.of( + getStreamWindowTime(((ProcessingTimeTumblingWindows) windows).size)); + } else if (windows instanceof EventTimeSessionWindows) { + return org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows + .withGap(getStreamWindowTime(((EventTimeSessionWindows) windows).gap)); + } else if (windows instanceof ProcessingTimeSessionWindows) { + return org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows + .withGap(getStreamWindowTime(((ProcessingTimeSessionWindows) windows).gap)); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported %s subclass: %s", + Windows.class.getSimpleName(), windows.getClass().getName())); + } + } + + private static org.apache.flink.streaming.api.windowing.time.Time getStreamWindowTime( + Time time) { + return org.apache.flink.streaming.api.windowing.time.Time.of( + time.getSize(), time.getUnit()); + } + + public static Object jsonEncode(Windows value) { + Map<String, Object> map = new HashMap<>(); + if (value == null) { + return map; + } + + map.put("class", value.getClass().getName()); + if (value instanceof CountTumblingWindows) { + map.put("size", ((CountTumblingWindows) value).size); + } else if (value instanceof ProcessingTimeTumblingWindows) { + map.putAll(encodeTime(((ProcessingTimeTumblingWindows) value).size, "size")); + } else if (value instanceof EventTimeTumblingWindows) { + map.putAll(encodeTime(((EventTimeTumblingWindows) value).size, "size")); + } else if (value instanceof ProcessingTimeSessionWindows) { + map.putAll(encodeTime(((ProcessingTimeSessionWindows) value).gap, "gap")); + } else if (value instanceof EventTimeSessionWindows) { + map.putAll(encodeTime(((EventTimeSessionWindows) value).gap, "gap")); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported %s subclass: %s", Windows.class, value.getClass())); + } + return map; + } + + public static Windows jsonDecode(Object json) { Review Comment: Since this method is only used by `WindowsParam`, would it be simpler to move it there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
