lindong28 commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904462286


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;
+        /** The values for assembling vectors. */
+        private transient DoubleArrayList values;
+
         public AssemblerFunc(String[] inputCols, String handleInvalid) {
             this.inputCols = inputCols;
             this.handleInvalid = handleInvalid;
         }
 
         @Override
-        public void flatMap(Row value, Collector<Row> out) throws Exception {
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            indices = new IntArrayList();
+            values = new DoubleArrayList();
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            int offset = 0;
             try {
-                Object[] objects = new Object[inputCols.length];
-                for (int i = 0; i < objects.length; ++i) {
-                    objects[i] = value.getField(inputCols[i]);
+                for (String inputCol : inputCols) {
+                    Object object = value.getField(inputCol);
+                    Preconditions.checkNotNull(object, "Input column value 
should not be null.");
+                    if (object instanceof Number) {
+                        indices.add(offset++);
+                        values.add(((Number) object).doubleValue());
+                    } else if (object instanceof SparseVector) {
+                        SparseVector sparseVector = (SparseVector) object;
+                        for (int i = 0; i < sparseVector.indices.length; ++i) {
+                            indices.add(sparseVector.indices[i] + offset);
+                            values.add(sparseVector.values[i]);
+                        }
+                        offset += sparseVector.size();
+                    } else if (object instanceof DenseVector) {
+                        DenseVector denseVector = (DenseVector) object;
+                        for (int i = 0; i < denseVector.size(); ++i) {
+                            indices.add(offset + i);
+                            values.add(denseVector.values[i]);
+                        }
+                        offset += denseVector.size();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Input type has not been supported yet.");
+                    }
+                }
+
+                Vector assembledVec =
+                        new SparseVector(

Review Comment:
   If the performance of using `it.unimi.dsi.fastutil.*` is not considerably 
faster than using a for loop to construct int[] and double[], it seems simpler 
to still use a for-loop instead of introducing an extra library dependency.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;
+        /** The values for assembling vectors. */
+        private transient DoubleArrayList values;

Review Comment:
   It seems simpler to instantiate indices and values as local variables in the 
`flatMap()` method. Does re-creating these two variables for each call in the 
flapMap() have non-trivial performance impact?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to