lindong28 commented on code in PR #157:
URL: https://github.com/apache/flink-ml/pull/157#discussion_r978349873
##########
flink-ml-python/pyflink/ml/lib/clustering/tests/test_agglomerativeclustering.py:
##########
@@ -157,6 +163,30 @@ def test_transform(self):
self.verify_clustering_result(self.eucliean_ward_threshold_as_two_result,
outputs[0], "features", "pred")
+ def test_transform_with_count_window(self):
+ input_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ (Vectors.dense([1, 1]),),
+ (Vectors.dense([1, 4]),),
+ (Vectors.dense([1, 0]),),
+ (Vectors.dense([4, 1.5]),),
+ (Vectors.dense([4, 4]),),
+ (Vectors.dense([4, 0]),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features'],
+ [DenseVectorTypeInfo()])))
+
+ agglomerative_clustering = AgglomerativeClustering() \
+ .set_linkage('average') \
+ .set_distance_measure('euclidean') \
+ .set_prediction_col('pred') \
+ .set_windows(CountTumblingWindows.of(6))
Review Comment:
Can we use `CountTumblingWindows.of(5)` so that the input stream (which
contains 6 elements) can be split into two windows? Having all elements in the
same window seems a bit trivial.
##########
flink-ml-python/pyflink/ml/core/windows.py:
##########
@@ -0,0 +1,151 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC
+
+from pyflink.common.time import Time
+
+
+class Windows(ABC):
+ """
+ Windowing strategy that determines how to create mini-batches from input
data.
+ """
+ pass
+
+
+class GlobalWindows(Windows):
+ """
+ A Windows that assigns all the elements into a single global window.
+ In order for this windowing strategy to work correctly, the input
+ stream must be bounded.
+ """
+
+ def __eq__(self, other):
+ return isinstance(other, GlobalWindows)
+
+
+class CountTumblingWindows(Windows):
+ """
+ A Windows that groups elements into windows of fixed number of
+ elements. Windows do not overlap.
+ """
+
+ def __init__(self, size: int):
+ super().__init__()
+ self._size = size
+
+ @staticmethod
+ def of(size: int) -> 'CountTumblingWindows':
+ return CountTumblingWindows(size)
+
+ @property
+ def size(self) -> int:
+ return self._size
+
+ def __eq__(self, other):
+ return isinstance(other, CountTumblingWindows) and self._size ==
other._size
+
+
+class EventTimeTumblingWindows(Windows):
+ """
+ A Windows that groups elements into fixed-size windows based on
+ the timestamp of the elements. Windows do not overlap.
+ """
+
+ def __init__(self, size: Time):
+ super().__init__()
+ self._size = size
+
+ @staticmethod
+ def of(size: Time) -> 'EventTimeTumblingWindows':
+ return EventTimeTumblingWindows(size)
+
+ @property
+ def size(self) -> Time:
+ return self._size
+
+ def __eq__(self, other):
+ return isinstance(other, EventTimeTumblingWindows) and self._size ==
other._size
+
+
+class ProcessingTimeTumblingWindows(Windows):
+ """
+ A Windows that groups elements into fixed-size windows based on
+ the current system time of the machine the operation is running
+ on. Windows do not overlap.
+ """
+
+ def __init__(self, size: Time):
+ super().__init__()
+ self._size = size
+
+ @staticmethod
+ def of(size: Time) -> 'ProcessingTimeTumblingWindows':
+ return ProcessingTimeTumblingWindows(size)
+
+ @property
+ def size(self) -> Time:
+ return self._size
+
+ def __eq__(self, other):
+ return isinstance(other, ProcessingTimeTumblingWindows) and self._size
== other._size
+
+
+class EventTimeSessionWindows(Windows):
+ """
+ A Windows that windows elements into sessions based on the
+ timestamp of the elements. Windows do not overlap.
+ """
+
+ def __init__(self, gap: Time):
+ super().__init__()
+ self._gap = gap
+
+ @staticmethod
+ def with_gap(gap: Time) -> 'EventTimeSessionWindows':
+ return EventTimeSessionWindows(gap)
+
+ @property
+ def gap(self) -> Time:
+ return self._gap
+
+ def __eq__(self, other):
+ return isinstance(other, EventTimeSessionWindows) and self._gap ==
other._gap
+
+
+class ProcessingTimeSessionWindows(Windows):
+ """
+ A Windows that windows elements into sessions based on the current
Review Comment:
`windows` -> `groups`
##########
flink-ml-python/pyflink/ml/core/windows.py:
##########
@@ -0,0 +1,151 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC
+
+from pyflink.common.time import Time
+
+
+class Windows(ABC):
+ """
+ Windowing strategy that determines how to create mini-batches from input
data.
+ """
+ pass
+
+
+class GlobalWindows(Windows):
+ """
+ A Windows that assigns all the elements into a single global window.
+ In order for this windowing strategy to work correctly, the input
+ stream must be bounded.
+ """
+
+ def __eq__(self, other):
+ return isinstance(other, GlobalWindows)
+
+
+class CountTumblingWindows(Windows):
+ """
+ A Windows that groups elements into windows of fixed number of
+ elements. Windows do not overlap.
+ """
+
+ def __init__(self, size: int):
+ super().__init__()
+ self._size = size
+
+ @staticmethod
+ def of(size: int) -> 'CountTumblingWindows':
+ return CountTumblingWindows(size)
+
+ @property
+ def size(self) -> int:
+ return self._size
+
+ def __eq__(self, other):
+ return isinstance(other, CountTumblingWindows) and self._size ==
other._size
+
+
+class EventTimeTumblingWindows(Windows):
+ """
+ A Windows that groups elements into fixed-size windows based on
+ the timestamp of the elements. Windows do not overlap.
+ """
+
+ def __init__(self, size: Time):
+ super().__init__()
+ self._size = size
+
+ @staticmethod
+ def of(size: Time) -> 'EventTimeTumblingWindows':
+ return EventTimeTumblingWindows(size)
+
+ @property
+ def size(self) -> Time:
+ return self._size
+
+ def __eq__(self, other):
+ return isinstance(other, EventTimeTumblingWindows) and self._size ==
other._size
+
+
+class ProcessingTimeTumblingWindows(Windows):
+ """
+ A Windows that groups elements into fixed-size windows based on
+ the current system time of the machine the operation is running
+ on. Windows do not overlap.
+ """
+
+ def __init__(self, size: Time):
+ super().__init__()
+ self._size = size
+
+ @staticmethod
+ def of(size: Time) -> 'ProcessingTimeTumblingWindows':
+ return ProcessingTimeTumblingWindows(size)
+
+ @property
+ def size(self) -> Time:
+ return self._size
+
+ def __eq__(self, other):
+ return isinstance(other, ProcessingTimeTumblingWindows) and self._size
== other._size
+
+
+class EventTimeSessionWindows(Windows):
+ """
+ A Windows that windows elements into sessions based on the
Review Comment:
nits: `windows` -> `groups` for consistency with the docs above.
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -70,6 +68,12 @@
* second one contains the information of merging two clusters at each step.
The data format of the
* merging information is (clusterId1, clusterId2, distance,
sizeOfMergedCluster).
*
+ * <p>This operator supports the {@link
org.apache.flink.ml.common.param.HasWindows} parameter,
+ * which creates mini-batches(windows) from input data and feeds each batch
into this operator. The
+ * operator would perform the clustering operation on each mini-batch
independently and output their
+ * clustering results in order. By default, all input data would be placed
into the same window and
+ * the clustering process would be a batch operation.
Review Comment:
I am not sure `clustering process would be a batch operation` is clear and
necessary. Would it be simpler to remove this statement?
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
}
}
+ /**
+ * Applies windowAll() and process() operation on the input stream.
+ *
+ * @param input The input data stream.
+ * @param windows The window that defines how input data would be sliced
into batches.
+ * @param function The user defined process function.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT>
windowAllProcess(
+ DataStream<IN> input, Windows windows,
ProcessAllWindowFunction<IN, OUT, W> function) {
+ AllWindowedStream<IN, W> allWindowedStream;
+ if (windows instanceof GlobalWindows) {
+ allWindowedStream = input.windowAll((WindowAssigner)
EndOfStreamWindows.get());
+ } else if (windows instanceof CountTumblingWindows) {
+ long countWindowSize = ((CountTumblingWindows) windows).getSize();
+ allWindowedStream = (AllWindowedStream<IN, W>)
input.countWindowAll(countWindowSize);
+ } else {
+ allWindowedStream =
+ input.windowAll((WindowAssigner)
getDataStreamTimeWindowAssigner(windows));
Review Comment:
Would it be simpler to also handle `windows instanceof GlobalWindows` here?
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
}
}
+ /**
+ * Applies windowAll() and process() operation on the input stream.
Review Comment:
Can you try to document the method based on its public behavior w.r.t. its
input/output rather than operators used in its implementation?
Maybe checkout `DataStream::windowAll(...)`'s java doc for example.
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
}
}
+ /**
+ * Applies windowAll() and process() operation on the input stream.
+ *
+ * @param input The input data stream.
+ * @param windows The window that defines how input data would be sliced
into batches.
+ * @param function The user defined process function.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT>
windowAllProcess(
+ DataStream<IN> input, Windows windows,
ProcessAllWindowFunction<IN, OUT, W> function) {
+ AllWindowedStream<IN, W> allWindowedStream;
+ if (windows instanceof GlobalWindows) {
+ allWindowedStream = input.windowAll((WindowAssigner)
EndOfStreamWindows.get());
+ } else if (windows instanceof CountTumblingWindows) {
+ long countWindowSize = ((CountTumblingWindows) windows).getSize();
+ allWindowedStream = (AllWindowedStream<IN, W>)
input.countWindowAll(countWindowSize);
+ } else {
+ allWindowedStream =
+ input.windowAll((WindowAssigner)
getDataStreamTimeWindowAssigner(windows));
Review Comment:
Would it be simpler to also handle `windows instanceof GlobalWindows` here?
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/CountTumblingWindows.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that groups elements into windows of fixed number of
elements. Windows do not
Review Comment:
Would the following doc be more consistent with the doc of other windows
subclasses?
A windowing strategy that groups elements into fixed-size windows based on
the count number of the elements. Windows do not overlap.
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.common.window.GlobalWindows;
+import org.apache.flink.ml.common.window.Windows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared windows param. */
+public interface HasWindows<T> extends WithParams<T> {
+ Param<Windows> WINDOWS =
+ new WindowsParam(
+ "windows",
+ "Windowing strategy that determines how to create
mini-batches from input data. "
+ + "By default, all data in the input stream would
be placed into the same window.",
Review Comment:
Would it be simpler to remove the description of the default behavior for
consistency with other parameters?
##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link Windows}. */
+@SuppressWarnings("unchecked")
+public class WindowsTest extends AbstractTestBase {
+ private static final int RECORD_NUM = 100;
+
+ private static List<Long> inputData;
+
+ private static DataStream<Long> inputStream;
+ private static DataStream<Long> inputStreamWithProcessingTimeGap;
+ private static DataStream<Long> inputStreamWithEventTime;
+
+ @BeforeClass
+ public static void beforeClass() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ inputData = new ArrayList<>();
+ for (long i = 0; i < RECORD_NUM; i++) {
+ inputData.add(i);
+ }
+ inputStream = env.fromCollection(inputData);
+
+ inputStreamWithProcessingTimeGap =
+ inputStream
+ .map(
+ new MapFunction<Long, Long>() {
+ private int count = 0;
+
+ @Override
+ public Long map(Long value) throws
Exception {
+ count++;
+ if (count % (RECORD_NUM / 2) == 0) {
+ Thread.sleep(1000);
+ }
+ return value;
+ }
+ })
+ .setParallelism(1);
+
+ inputStreamWithEventTime =
+ inputStream.assignTimestampsAndWatermarks(
+ WatermarkStrategy.<Long>forMonotonousTimestamps()
+ .withTimestampAssigner(
+ (SerializableTimestampAssigner<Long>)
+ (element, recordTimestamp) ->
element));
+ }
+
+ @Test
+ public void testGlobalWindows() throws Exception {
+ DataStream<List<Long>> outputStream =
+ DataStreamUtils.windowAllProcess(
+ inputStream,
+ GlobalWindows.getInstance(),
+ new CreateAllWindowBatchFunction<>());
+ List<List<Long>> actualBatches =
IteratorUtils.toList(outputStream.executeAndCollect());
+ assertEquals(1, actualBatches.size());
+ assertEquals(new HashSet<>(inputData), new
HashSet<>(actualBatches.get(0)));
+ }
+
+ @Test
+ public void testCountTumblingWindows() throws Exception {
+ DataStream<List<Long>> outputStream =
+ DataStreamUtils.windowAllProcess(
+ inputStream,
+ CountTumblingWindows.of(RECORD_NUM / 7),
+ new CreateAllWindowBatchFunction<>());
+ List<List<Long>> actualBatches =
IteratorUtils.toList(outputStream.executeAndCollect());
+ assertEquals(7, actualBatches.size());
+ int count = 0;
+ for (List<Long> batch : actualBatches) {
+ count += batch.size();
+ }
+ assertEquals(RECORD_NUM - (RECORD_NUM % 7), count);
+ }
+
+ @Test
+ public void testProcessingTimeTumblingWindows() throws Exception {
+ DataStream<List<Long>> outputStream =
+ DataStreamUtils.windowAllProcess(
+ inputStreamWithProcessingTimeGap,
+
ProcessingTimeTumblingWindows.of(Time.milliseconds(100)),
+ new CreateAllWindowBatchFunction<>());
+ List<List<Long>> actualBatches =
IteratorUtils.toList(outputStream.executeAndCollect());
+ assertTrue(actualBatches.size() > 1);
+ List<Long> mergedBatches = new ArrayList<>();
+ for (List<Long> batch : actualBatches) {
+ mergedBatches.addAll(batch);
+ }
+ assertTrue(mergedBatches.containsAll(inputData.subList(0, RECORD_NUM -
1)));
+ }
+
+ @Test
+ public void testEventTimeTumblingWindows() throws Exception {
+ DataStream<List<Long>> outputStream =
+ DataStreamUtils.windowAllProcess(
+ inputStreamWithEventTime,
+
EventTimeTumblingWindows.of(Time.milliseconds(RECORD_NUM / 7)),
+ new CreateAllWindowBatchFunction<>());
+ List<List<Long>> actualBatches =
IteratorUtils.toList(outputStream.executeAndCollect());
+ assertEquals(8, actualBatches.size());
+ List<Long> mergedBatches = new ArrayList<>();
+ for (List<Long> batch : actualBatches) {
+ mergedBatches.addAll(batch);
+ }
+ assertEquals(RECORD_NUM, mergedBatches.size());
+ assertEquals(new HashSet<>(inputData), new HashSet<>(mergedBatches));
+ }
+
+ @Test
+ public void testProcessingTimeSessionWindows() throws Exception {
+ DataStream<List<Long>> outputStream =
+ DataStreamUtils.windowAllProcess(
+ inputStreamWithProcessingTimeGap,
+
ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)),
+ new CreateAllWindowBatchFunction<>());
+ List<List<Long>> actualBatches =
IteratorUtils.toList(outputStream.executeAndCollect());
+ assertTrue(actualBatches.size() > 1);
+ List<Long> mergedBatches = new ArrayList<>();
+ for (List<Long> batch : actualBatches) {
+ mergedBatches.addAll(batch);
+ }
+ assertTrue(mergedBatches.containsAll(inputData.subList(0, RECORD_NUM -
1)));
+ }
+
+ @Test
+ public void testEventTimeSessionWindows() throws Exception {
+ DataStream<List<Long>> outputStream =
+ DataStreamUtils.windowAllProcess(
+ inputStreamWithEventTime,
+
EventTimeSessionWindows.withGap(Time.milliseconds(RECORD_NUM / 7)),
+ new CreateAllWindowBatchFunction<>());
+ List<List<Long>> actualBatches =
IteratorUtils.toList(outputStream.executeAndCollect());
+ assertEquals(1, actualBatches.size());
+ assertEquals(new HashSet<>(inputData), new
HashSet<>(actualBatches.get(0)));
+ }
+
+ @Test
+ public void testSaveLoadWindowsParams() throws Exception {
Review Comment:
It might be more readable to move this test into `StageTest.java` for
consistency with `test_stage.py`, and keep the practice that all tests
regarding the parameters' save/load can be found in StageTest.java.
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
}
}
+ /**
+ * Applies windowAll() and process() operation on the input stream.
+ *
+ * @param input The input data stream.
+ * @param windows The window that defines how input data would be sliced
into batches.
+ * @param function The user defined process function.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT>
windowAllProcess(
+ DataStream<IN> input, Windows windows,
ProcessAllWindowFunction<IN, OUT, W> function) {
+ AllWindowedStream<IN, W> allWindowedStream;
+ if (windows instanceof GlobalWindows) {
+ allWindowedStream = input.windowAll((WindowAssigner)
EndOfStreamWindows.get());
+ } else if (windows instanceof CountTumblingWindows) {
+ long countWindowSize = ((CountTumblingWindows) windows).getSize();
+ allWindowedStream = (AllWindowedStream<IN, W>)
input.countWindowAll(countWindowSize);
+ } else {
+ allWindowedStream =
+ input.windowAll((WindowAssigner)
getDataStreamTimeWindowAssigner(windows));
+ }
+ return allWindowedStream.process(function);
+ }
+
+ private static WindowAssigner<Object, TimeWindow>
getDataStreamTimeWindowAssigner(
+ Windows windows) {
+ if (windows instanceof EventTimeTumblingWindows) {
+ return TumblingEventTimeWindows.of(
+ getStreamWindowTime(((EventTimeTumblingWindows)
windows).getSize()));
+ } else if (windows instanceof ProcessingTimeTumblingWindows) {
+ return TumblingProcessingTimeWindows.of(
+ getStreamWindowTime(((ProcessingTimeTumblingWindows)
windows).getSize()));
+ } else if (windows instanceof EventTimeSessionWindows) {
+ return
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+ .withGap(getStreamWindowTime(((EventTimeSessionWindows)
windows).getGap()));
+ } else if (windows instanceof ProcessingTimeSessionWindows) {
+ return
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
+ .withGap(
+
getStreamWindowTime(((ProcessingTimeSessionWindows) windows).getGap()));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported %s subclass: %s",
+ Windows.class.getSimpleName(),
windows.getClass().getName()));
Review Comment:
Would it be simpler to hardcode the window class name instead of calling
`Windows.class.getSimpleName()`?
##########
docs/content/docs/operators/clustering/agglomerativeclustering.md:
##########
@@ -49,15 +49,16 @@ format of the merging information is
### Parameters
-| Key | Default | Type | Required | Description
|
-|:------------------|:---------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------|
-| numClusters | `2` | Integer | no | The max number of
clusters to create.
|
-| distanceThreshold | `null` | Double | no | Threshold to
decide whether two clusters should be merged.
|
-| linkage | `"ward"` | String | no | Criterion for
computing distance between two clusters. Supported values: `'ward', 'complete',
'single', 'average'`. |
-| computeFullTree | `false` | Boolean | no | Whether computes
the full tree after convergence.
|
-| distanceMeasure | `"euclidean"` | String | no | Distance measure.
Supported values: `'euclidean', 'manhattan', 'cosine'`.
|
-| featuresCol | `"features"` | String | no | Features column
name.
|
-| predictionCol | `"prediction"` | String | no | Prediction column
name.
|
+| Key | Default | Type | Required | Description
|
+|:------------------|:----------------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| numClusters | `2` | Integer | no | The max
number of clusters to create.
|
+| distanceThreshold | `null` | Double | no | Threshold
to decide whether two clusters should be merged.
|
+| linkage | `"ward"` | String | no | Criterion
for computing distance between two clusters.
|
+| computeFullTree | `false` | Boolean | no | Whether
computes the full tree after convergence.
|
+| distanceMeasure | `"euclidean"` | String | no | Distance
measure.
|
+| featuresCol | `"features"` | String | no | Features
column name.
|
+| predictionCol | `"prediction"` | String | no | Prediction
column name.
|
+| windows | `GlobalWindows.get()` | Windows | no | Windowing
strategy that determines how to create mini-batches from input data. By
default, all data in the input stream would be placed into the same window. |
Review Comment:
GlobalWindows.get() -> GlobalWindows.getInstance()
##########
flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java:
##########
@@ -203,6 +229,59 @@ public void testTransform() throws Exception {
agglomerativeClustering.getPredictionCol());
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTransformWithEventTimeWindow() throws Exception {
+ RowTypeInfo outputTypeInfo =
+ new RowTypeInfo(
+ new TypeInformation<?>[]
{DenseVectorTypeInfo.INSTANCE, Types.INSTANT},
+ new String[] {"features", "ts"});
+
+ Instant baseTime = Instant.now();
+ DataStream<Row> inputDataStream =
+ env.fromCollection(INPUT_DATA)
+ .setParallelism(1)
+ .map(x -> Row.of(x, baseTime.plusSeconds((long)
x.get(0))), outputTypeInfo);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column("features",
DataTypes.of(DenseVectorTypeInfo.INSTANCE))
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
+ .watermark("ts", "ts - INTERVAL '5' SECOND")
+ .build();
+
+ Table inputDataTable = tEnv.fromDataStream(inputDataStream, schema);
+
+ AgglomerativeClustering agglomerativeClustering =
+ new AgglomerativeClustering()
+
.setLinkage(AgglomerativeClusteringParams.LINKAGE_AVERAGE)
+ .setDistanceMeasure(EuclideanDistanceMeasure.NAME)
+ .setPredictionCol("pred")
+
.setWindows(EventTimeTumblingWindows.of(Time.seconds(1)));
Review Comment:
Can we keep the python test and java test consistent? Maybe add a test with
CountTumblingWindow in Java and add a test with EventTimeTumblingWindows in
Python?
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -70,6 +68,12 @@
* second one contains the information of merging two clusters at each step.
The data format of the
* merging information is (clusterId1, clusterId2, distance,
sizeOfMergedCluster).
*
+ * <p>This operator supports the {@link
org.apache.flink.ml.common.param.HasWindows} parameter,
+ * which creates mini-batches(windows) from input data and feeds each batch
into this operator. The
+ * operator would perform the clustering operation on each mini-batch
independently and output their
Review Comment:
Since the windowing strategy is used inside this operator rather than before
this operator, it seems confusing to say `creates mini-batch ... and feeds each
batch into this operator`.
How about the following doc:
This AlgoOperator splits input stream into mini-batches of elements
according to the windowing strategy specified by the {@link
org.apache.flink.ml.common.param.HasWindows} parameter, and performs the
hierarchical clustering on each mini-batch independently. The clustering result
of each element depends only on the elements in the same mini-batch.
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -197,6 +213,58 @@ public static <T> void setManagedMemoryWeight(
}
}
+ /**
+ * Applies windowAll() and process() operation on the input stream.
+ *
+ * @param input The input data stream.
+ * @param windows The window that defines how input data would be sliced
into batches.
+ * @param function The user defined process function.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static <IN, OUT, W extends Window> SingleOutputStreamOperator<OUT>
windowAllProcess(
Review Comment:
How about renaming this function as `windowAllAndProcess(...)`?
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/EventTimeSessionWindows.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that windows elements into sessions based on the
timestamp of the elements.
Review Comment:
How about this doc:
A windowing strategy that groups elements into sessions based on the
timestamp of the elements. Windows do not overlap.
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasWindows.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.common.window.GlobalWindows;
+import org.apache.flink.ml.common.window.Windows;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WindowsParam;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for the shared windows param. */
+public interface HasWindows<T> extends WithParams<T> {
+ Param<Windows> WINDOWS =
+ new WindowsParam(
+ "windows",
+ "Windowing strategy that determines how to create
mini-batches from input data. "
+ + "By default, all data in the input stream would
be placed into the same window.",
+ GlobalWindows.getInstance(),
+ ParamValidators.notNull());
+
+ default T setWindows(Windows windows) {
+ return set(WINDOWS, windows);
+ }
+
+ default Windows getWindows() {
Review Comment:
Put `getWindows()` before `setWindows()` for consistency with other
parameters?
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/EventTimeTumblingWindows.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link Windows} that groups elements into fixed-size windows based on the
timestamp of the
Review Comment:
How about this doc:
A windowing strategy that groups elements into fixed-size windows based on
the timestamp of the elements. Windows do not overlap.
##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/window/GlobalWindows.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.window;
+
+/**
+ * A {@link Windows} that assigns all the elements into a single global
window. In order for this
Review Comment:
Would the following doc be more consistent with docs of other windows
subclasses?
A windowing strategy that groups all elements into a single global window.
This strategy assumes that the input strategy is bounded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]