yunfengzhou-hub commented on code in PR #157:
URL: https://github.com/apache/flink-ml/pull/157#discussion_r976515297


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -113,18 +110,17 @@ public Table[] transform(Table... inputs) {
                 new OutputTag<Tuple4<Integer, Integer, Double, 
Integer>>("MERGE_INFO") {};
 
         SingleOutputStreamOperator<Row> output =
-                dataStream.transform(
-                        "doLocalAgglomerativeClustering",
-                        outputTypeInfo,
-                        new LocalAgglomerativeClusteringOperator(
-                                getFeaturesCol(),
-                                getLinkage(),
-                                getDistanceMeasure(),
-                                getNumClusters(),
-                                getDistanceThreshold(),
-                                getComputeFullTree(),
-                                mergeInfoOutputTag));
-        output.getTransformation().setParallelism(1);
+                WindowUtils.windowAll(dataStream, getWindow())

Review Comment:
   According to offline discussion, for AgglomerativeClustering we would 
implement the logic with parallelism = 1 for now, given the implementation 
detail of this algorithm. We may perform the optimization for other possible 
algorithms in future, like the reduce() operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to