yunfengzhou-hub commented on code in PR #157:
URL: https://github.com/apache/flink-ml/pull/157#discussion_r976515297
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/agglomerativeclustering/AgglomerativeClustering.java:
##########
@@ -113,18 +110,17 @@ public Table[] transform(Table... inputs) {
new OutputTag<Tuple4<Integer, Integer, Double,
Integer>>("MERGE_INFO") {};
SingleOutputStreamOperator<Row> output =
- dataStream.transform(
- "doLocalAgglomerativeClustering",
- outputTypeInfo,
- new LocalAgglomerativeClusteringOperator(
- getFeaturesCol(),
- getLinkage(),
- getDistanceMeasure(),
- getNumClusters(),
- getDistanceThreshold(),
- getComputeFullTree(),
- mergeInfoOutputTag));
- output.getTransformation().setParallelism(1);
+ WindowUtils.windowAll(dataStream, getWindow())
Review Comment:
According to offline discussion, for AgglomerativeClustering we would
implement the logic with parallelism = 1 for now, given the implementation
detail of this algorithm. We may perform the optimization for other possible
algorithms in future, like the reduce() operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]