gaoyunhaii commented on a change in pull request #37:
URL: https://github.com/apache/flink-ml/pull/37#discussion_r769431697



##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/typeinfo/SparseVectorTypeInfo.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.linalg.typeinfo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.ml.linalg.SparseVector;
+
+/** A {@link TypeInformation} for the {@link SparseVector} type. */
+public class SparseVectorTypeInfo extends TypeInformation<SparseVector> {
+    public static final SparseVectorTypeInfo INSTANCE = new 
SparseVectorTypeInfo();

Review comment:
       This field seems unused ?

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */
+    private void sortIndices() {
+        sortImpl(this.indices, this.values, 0, this.indices.length - 1);
+    }
+
+    /** Sort the indices and values using quick sort. */
+    private static void sortImpl(int[] indices, double[] values, int low, int 
high) {
+        int pivotPos = (low + high) / 2;
+        int pivot = indices[pivotPos];
+        indices[pivotPos] = indices[high];
+        indices[high] = pivot;
+        double t = values[pivotPos];
+        values[pivotPos] = values[high];
+        values[high] = t;
+
+        int pos = low - 1;
+        for (int i = low; i <= high; i++) {
+            if (indices[i] <= pivot) {
+                pos++;
+                int tempI = indices[pos];
+                double tempD = values[pos];
+                indices[pos] = indices[i];
+                values[pos] = values[i];
+                indices[i] = tempI;
+                values[i] = tempD;
+            }
+        }
+        if (high > pos + 1) {
+            sortImpl(indices, values, pos + 1, high);
+        }
+        while (pos - 1 > low && indices[pos - 1] == pivot) {
+            pos--;
+        }
+        if (pos - 1 > low) {
+            sortImpl(indices, values, low, pos - 1);
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sbr = new 
StringBuilder(SparseVector.class.getSimpleName() + "[");

Review comment:
       Would it be better for to string to outputs tuples of position / value 
instead of outputs a lot of zeros ? 

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */
+    private void sortIndices() {
+        sortImpl(this.indices, this.values, 0, this.indices.length - 1);
+    }
+
+    /** Sort the indices and values using quick sort. */
+    private static void sortImpl(int[] indices, double[] values, int low, int 
high) {
+        int pivotPos = (low + high) / 2;
+        int pivot = indices[pivotPos];
+        indices[pivotPos] = indices[high];
+        indices[high] = pivot;
+        double t = values[pivotPos];
+        values[pivotPos] = values[high];
+        values[high] = t;
+
+        int pos = low - 1;
+        for (int i = low; i <= high; i++) {
+            if (indices[i] <= pivot) {
+                pos++;
+                int tempI = indices[pos];
+                double tempD = values[pos];
+                indices[pos] = indices[i];
+                values[pos] = values[i];
+                indices[i] = tempI;
+                values[i] = tempD;
+            }
+        }
+        if (high > pos + 1) {
+            sortImpl(indices, values, pos + 1, high);
+        }
+        while (pos - 1 > low && indices[pos - 1] == pivot) {

Review comment:
       Could you elaborate a bit the function of the iteration here ? 

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */
+    private void sortIndices() {
+        sortImpl(this.indices, this.values, 0, this.indices.length - 1);
+    }
+
+    /** Sort the indices and values using quick sort. */
+    private static void sortImpl(int[] indices, double[] values, int low, int 
high) {
+        int pivotPos = (low + high) / 2;
+        int pivot = indices[pivotPos];
+        indices[pivotPos] = indices[high];
+        indices[high] = pivot;
+        double t = values[pivotPos];
+        values[pivotPos] = values[high];
+        values[high] = t;
+
+        int pos = low - 1;
+        for (int i = low; i <= high; i++) {
+            if (indices[i] <= pivot) {
+                pos++;
+                int tempI = indices[pos];
+                double tempD = values[pos];
+                indices[pos] = indices[i];
+                values[pos] = values[i];
+                indices[i] = tempI;
+                values[i] = tempD;
+            }
+        }
+        if (high > pos + 1) {
+            sortImpl(indices, values, pos + 1, high);
+        }
+        while (pos - 1 > low && indices[pos - 1] == pivot) {
+            pos--;
+        }
+        if (pos - 1 > low) {

Review comment:
       Similarly do we need sort if `pos - 2 == low` ? 

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */

Review comment:
       I think flink tends to use singular for the comments, We'd better keep 
using `Sorts`, also for other comments~

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */
+    private void sortIndices() {
+        sortImpl(this.indices, this.values, 0, this.indices.length - 1);
+    }
+
+    /** Sort the indices and values using quick sort. */
+    private static void sortImpl(int[] indices, double[] values, int low, int 
high) {
+        int pivotPos = (low + high) / 2;
+        int pivot = indices[pivotPos];

Review comment:
        could we extract a separate `swap` method  to swap elements in indices 
and  values at the same time?

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */
+    private void sortIndices() {
+        sortImpl(this.indices, this.values, 0, this.indices.length - 1);
+    }
+
+    /** Sort the indices and values using quick sort. */
+    private static void sortImpl(int[] indices, double[] values, int low, int 
high) {
+        int pivotPos = (low + high) / 2;
+        int pivot = indices[pivotPos];
+        indices[pivotPos] = indices[high];
+        indices[high] = pivot;
+        double t = values[pivotPos];
+        values[pivotPos] = values[high];
+        values[high] = t;
+
+        int pos = low - 1;
+        for (int i = low; i <= high; i++) {
+            if (indices[i] <= pivot) {
+                pos++;
+                int tempI = indices[pos];
+                double tempD = values[pos];
+                indices[pos] = indices[i];
+                values[pos] = values[i];
+                indices[i] = tempI;
+                values[i] = tempD;
+            }
+        }
+        if (high > pos + 1) {

Review comment:
       Do we also need sort if `high == pos + 2` ? 

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/SparseVector.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.linalg;
+
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfoFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** A sparse vector of double values. */
+@TypeInfo(SparseVectorTypeInfoFactory.class)
+public class SparseVector implements Vector {
+    public final int n;
+    public final int[] indices;
+    public final double[] values;
+
+    public SparseVector(int n, int[] indices, double[] values) {
+        this.n = n;
+        this.indices = indices;
+        this.values = values;
+        if (!isIndicesSorted()) {
+            sortIndices();
+        }
+        validateSortedData();
+    }
+
+    @Override
+    public int size() {
+        return n;
+    }
+
+    @Override
+    public double get(int i) {
+        int pos = Arrays.binarySearch(indices, i);
+        if (pos >= 0) {
+            return values[pos];
+        }
+        return 0.;
+    }
+
+    @Override
+    public double[] toArray() {
+        double[] result = new double[n];
+        for (int i = 0; i < indices.length; i++) {
+            result[indices[i]] = values[i];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SparseVector that = (SparseVector) o;
+        return n == that.n
+                && Arrays.equals(indices, that.indices)
+                && Arrays.equals(values, that.values);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(n);
+        result = 31 * result + Arrays.hashCode(indices);
+        result = 31 * result + Arrays.hashCode(values);
+        return result;
+    }
+
+    /**
+     * Check whether input data is validate.
+     *
+     * <p>This function does the following checks:
+     *
+     * <ul>
+     *   <li>The indices array and values array are of the same size.
+     *   <li>vector indices are in valid range.
+     *   <li>vector indices are unique.
+     * </ul>
+     *
+     * <p>This function works as expected only when indices are sorted.
+     */
+    private void validateSortedData() {
+        Preconditions.checkArgument(
+                indices.length == values.length,
+                "Indices size and values size should be the same.");
+        if (this.indices.length > 0) {
+            Preconditions.checkArgument(
+                    this.indices[0] >= 0 && this.indices[this.indices.length - 
1] < this.n,
+                    "Index out of bound.");
+        }
+        for (int i = 1; i < this.indices.length; i++) {
+            Preconditions.checkArgument(
+                    this.indices[i] > this.indices[i - 1], "Indices 
duplicated.");
+        }
+    }
+
+    private boolean isIndicesSorted() {
+        for (int i = 1; i < this.indices.length; i++) {
+            if (this.indices[i] < this.indices[i - 1]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Sort the indices and values. */
+    private void sortIndices() {
+        sortImpl(this.indices, this.values, 0, this.indices.length - 1);
+    }
+
+    /** Sort the indices and values using quick sort. */
+    private static void sortImpl(int[] indices, double[] values, int low, int 
high) {

Review comment:
       Could we also add some tests for the boundary cases for the sort here ? 

##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/linalg/typeinfo/SparseVectorSerializer.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.linalg.typeinfo;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.ml.linalg.SparseVector;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Specialized serializer for {@link SparseVector}. */
+public final class SparseVectorSerializer extends 
TypeSerializerSingleton<SparseVector> {

Review comment:
       Perhaps we need a test for the serialize / deserializer for the 
serializer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to