lindong28 commented on code in PR #157: URL: https://github.com/apache/flink-ml/pull/157#discussion_r975994492
########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/BoundedWindow.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +/** A {@link Window} that groups all elements in a bounded stream into one window. */ +public class BoundedWindow implements Window { Review Comment: Would it be simpler to just use an existing window with size = MAX_LONG? ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/BoundedWindow.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +/** A {@link Window} that groups all elements in a bounded stream into one window. */ +public class BoundedWindow implements Window { + private static final BoundedWindow INSTANCE = new BoundedWindow(); + + private BoundedWindow() {} + + public static BoundedWindow get() { Review Comment: Would it be better to use `getInstance()` to be consistent with `EuclideanDistanceMeasure::getInstance()`? ########## docs/content/docs/operators/clustering/agglomerativeclustering.md: ########## @@ -49,15 +49,16 @@ format of the merging information is ### Parameters -| Key | Default | Type | Required | Description | -|:------------------|:---------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------| -| numClusters | `2` | Integer | no | The max number of clusters to create. | -| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. | -| linkage | `"ward"` | String | no | Criterion for computing distance between two clusters. Supported values: `'ward', 'complete', 'single', 'average'`. | -| computeFullTree | `false` | Boolean | no | Whether computes the full tree after convergence. | -| distanceMeasure | `"euclidean"` | String | no | Distance measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. | -| featuresCol | `"features"` | String | no | Features column name. | -| predictionCol | `"prediction"` | String | no | Prediction column name. | +| Key | Default | Type | Required | Description | +| :---------------- | :-------------------- | :------ | :------- | :----------------------------------------------------------- | +| numClusters | `2` | Integer | no | The max number of clusters to create. | +| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. | Review Comment: nits: do we need to introduce these whitespace changes? ########## flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java: ########## @@ -113,18 +110,17 @@ public Table[] transform(Table... inputs) { new OutputTag<Tuple4<Integer, Integer, Double, Integer>>("MERGE_INFO") {}; SingleOutputStreamOperator<Row> output = - dataStream.transform( - "doLocalAgglomerativeClustering", - outputTypeInfo, - new LocalAgglomerativeClusteringOperator( - getFeaturesCol(), - getLinkage(), - getDistanceMeasure(), - getNumClusters(), - getDistanceThreshold(), - getComputeFullTree(), - mergeInfoOutputTag)); - output.getTransformation().setParallelism(1); + WindowUtils.windowAll(dataStream, getWindow()) Review Comment: Does it mean that any algorithm that supports window can only apply the transformation logic with parallelism = 1? Can we improve its performance by applying local aggregation/transformation on multiple parallelism? Maybe see `AllReduceImpl::allReduceSum` for example. ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/Window.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +/** + * A {@link Window} determines how input data stream would be sliced into batches and fed into a + * Flink ML Stage. + */ +public interface Window {} Review Comment: Since this class represents window strategy, not concrete window instances, would it be better to rename this class as `Windows`? Note that Flink names similar concepts as `TumblingEventTimeWindows`. Beam names similar concepts as `SlidingWindows` [1]. [1] https://beam.apache.org/documentation/programming-guide/#using-sliding-time-windows ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/TumbleWindow.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.time.Time; + +import java.util.Objects; + +/** + * A {@link Window} that windows elements into fixed-size windows based on the timestamp of the + * elements. Windows do not overlap. + */ +public class TumbleWindow implements Window { + /** Size of this window as time interval. */ + Time timeWindowSize; + + /** Offset of this window. Windows start at time N * size + offset, where 0 is the epoch. */ + Time timeWindowOffset; + + /** Size of this window as row-count interval. */ + long countWindowSize; + + boolean isEventTime; + + private TumbleWindow() { + this.timeWindowSize = null; + this.timeWindowOffset = null; + this.isEventTime = true; + this.countWindowSize = -1; + } + + /** + * Creates a new {@link TumbleWindow}. + * + * @param size the size of the window as time interval. + */ + public static TumbleWindow over(Time size) { + return TumbleWindow.over(size, Time.milliseconds(0)); + } + + /** + * Creates a new {@link TumbleWindow}. + * + * @param size the size of the window as time interval. + * @param offset the offset of this window. + */ + public static TumbleWindow over(Time size, Time offset) { + TumbleWindow tumbleWindow = new TumbleWindow(); + tumbleWindow.timeWindowSize = size; + tumbleWindow.timeWindowOffset = offset; + return tumbleWindow; + } + + /** + * Creates a new {@link TumbleWindow}. + * + * @param size the size of the window as row-count interval. + */ + public static TumbleWindow over(long size) { + TumbleWindow tumbleWindow = new TumbleWindow(); + tumbleWindow.countWindowSize = size; + return tumbleWindow; + } + + public TumbleWindow withEventTime() { + isEventTime = true; + return this; + } + + public TumbleWindow withProcessingTime() { Review Comment: Does Flink Table API support processing-time tumble window? ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/TumbleWindow.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.time.Time; + +import java.util.Objects; + +/** + * A {@link Window} that windows elements into fixed-size windows based on the timestamp of the + * elements. Windows do not overlap. + */ +public class TumbleWindow implements Window { + /** Size of this window as time interval. */ + Time timeWindowSize; + + /** Offset of this window. Windows start at time N * size + offset, where 0 is the epoch. */ + Time timeWindowOffset; Review Comment: If we don't have any Flink ML use case for this field, it might be better to remove it from this PR in order to keep our interface as simple as possible. ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/TumbleWindow.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.time.Time; + +import java.util.Objects; + +/** + * A {@link Window} that windows elements into fixed-size windows based on the timestamp of the + * elements. Windows do not overlap. + */ +public class TumbleWindow implements Window { + /** Size of this window as time interval. */ + Time timeWindowSize; + + /** Offset of this window. Windows start at time N * size + offset, where 0 is the epoch. */ + Time timeWindowOffset; + + /** Size of this window as row-count interval. */ + long countWindowSize; Review Comment: There might be confusing combinations of value assignment for `countWindowSize` and `isEventTime`. For example, there is no intuitive way to understand the behavior of `TumbleWindow(countWindowSize = 10, isEventTime = true)`. How about we replace `TumbleWindow` with the following classes: `CountTumblingWindows`, `EventTimeTumblingWindows`, `ProcessingTimeTumblingWindows`? ########## flink-ml-core/src/main/java/org/apache/flink/ml/common/window/TumbleWindow.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.window; + +import org.apache.flink.api.common.time.Time; + +import java.util.Objects; + +/** + * A {@link Window} that windows elements into fixed-size windows based on the timestamp of the + * elements. Windows do not overlap. + */ +public class TumbleWindow implements Window { Review Comment: How about renaming this class as `TumblingWindows` since this name is much more widely used than TumbleWindow according to the Google search results. For example, Azure Stream Analytics uses Tumbling Window [1]. [1] https://learn.microsoft.com/en-us/stream-analytics-query/tumbling-window-azure-stream-analytics for example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
