jiangxin369 commented on code in PR #195:
URL: https://github.com/apache/flink-ml/pull/195#discussion_r1058706248


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/clustering/KMeansModelDataGenerator.java:
##########
@@ -55,20 +61,36 @@ public Table[] getData(StreamTableEnvironment tEnv) {
         InputDataGenerator<?> vectorArrayGenerator = new 
DenseVectorArrayGenerator();
         ReadWriteUtils.updateExistingParams(vectorArrayGenerator, paramMap);
         vectorArrayGenerator.setNumValues(1);
+        vectorArrayGenerator.setColNames(new String[] {"centroids"});
+
+        Table centroidsTable = vectorArrayGenerator.getData(tEnv)[0];
 
-        Table vectorArrayTable = vectorArrayGenerator.getData(tEnv)[0];
-        DataStream<KMeansModelData> modelDataStream =
-                tEnv.toDataStream(vectorArrayTable, DenseVector[].class)
-                        .map(new GenerateKMeansModelDataFunction());
+        Table modelDataTable =
+                centroidsTable.select(
+                        $("centroids"),
+                        call(GenerateWeightsFunction.class, 
$("centroids")).as("weights"));
 
-        return new Table[] {tEnv.fromDataStream(modelDataStream)};
+        return new Table[] {modelDataTable};
     }
 
-    private static class GenerateKMeansModelDataFunction
-            implements MapFunction<DenseVector[], KMeansModelData> {
+    /**
+     * A scalar function that generate the weights vector for KMeansModelData 
from the centroids

Review Comment:
   ```suggestion
        * A scalar function that generates the weights vector for 
KMeansModelData from the centroids
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to