Repository: spark
Updated Branches:
  refs/heads/master ed730c950 -> 9c65fa76f


http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 08cd9ab..cb85e43 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.mllib.util
 
+import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
+  squaredDistance => breezeSquaredDistance}
+
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-
-import org.jblas.DoubleMatrix
-
 import org.apache.spark.mllib.regression.LabeledPoint
-
-import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => 
breezeSquaredDistance}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
 /**
  * Helper methods to load, save and pre-process data used in ML Lib.
@@ -41,6 +39,107 @@ object MLUtils {
   }
 
   /**
+   * Multiclass label parser, which parses a string into double.
+   */
+  val multiclassLabelParser: String => Double = _.toDouble
+
+  /**
+   * Binary label parser, which outputs 1.0 (positive) if the value is greater 
than 0.5,
+   * or 0.0 (negative) otherwise.
+   */
+  val binaryLabelParser: String => Double = label => if (label.toDouble > 0.5) 
1.0 else 0.0
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint].
+   * The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR.
+   * Each line represents a labeled sparse feature vector using the following 
format:
+   * {{{label index1:value1 index2:value2 ...}}}
+   * where the indices are one-based and in ascending order.
+   * This method parses each line into a 
[[org.apache.spark.mllib.regression.LabeledPoint]],
+   * where the feature indices are converted to zero-based.
+   *
+   * @param sc Spark context
+   * @param path file or directory path in any Hadoop-supported file system URI
+   * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 
otherwise
+   * @param numFeatures number of features, which will be determined from the 
input data if a
+   *                    negative value is given. The default value is -1.
+   * @param minSplits min number of partitions, default: sc.defaultMinSplits
+   * @return labeled data stored as an RDD[LabeledPoint]
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double,
+      numFeatures: Int,
+      minSplits: Int): RDD[LabeledPoint] = {
+    val parsed = sc.textFile(path, minSplits)
+      .map(_.trim)
+      .filter(!_.isEmpty)
+      .map(_.split(' '))
+    // Determine number of features.
+    val d = if (numFeatures >= 0) {
+      numFeatures
+    } else {
+      parsed.map { items =>
+        if (items.length > 1) {
+          items.last.split(':')(0).toInt
+        } else {
+          0
+        }
+      }.reduce(math.max)
+    }
+    parsed.map { items =>
+      val label = labelParser(items.head)
+      val (indices, values) = items.tail.map { item =>
+        val indexAndValue = item.split(':')
+        val index = indexAndValue(0).toInt - 1
+        val value = indexAndValue(1).toDouble
+        (index, value)
+      }.unzip
+      LabeledPoint(label, Vectors.sparse(d, indices.toArray, values.toArray))
+    }
+  }
+
+  // Convenient methods for calling from Java.
+
+  /**
+   * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with number of features determined automatically and the default number 
of partitions.
+   */
+  def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, binaryLabelParser, -1, sc.defaultMinSplits)
+
+  /**
+   * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with number of features specified explicitly and the default number of 
partitions.
+   */
+  def loadLibSVMData(sc: SparkContext, path: String, numFeatures: Int): 
RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, binaryLabelParser, numFeatures, 
sc.defaultMinSplits)
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with the given label parser, number of features determined automatically,
+   * and the default number of partitions.
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with the given label parser, number of features specified explicitly,
+   * and the default number of partitions.
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double,
+      numFeatures: Int): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
+
+  /**
    * Load labeled data from a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the 
corresponding label as Double.
@@ -54,7 +153,7 @@ object MLUtils {
     sc.textFile(dir).map { line =>
       val parts = line.split(',')
       val label = parts(0).toDouble
-      val features = parts(1).trim().split(' ').map(_.toDouble)
+      val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble))
       LabeledPoint(label, features)
     }
   }
@@ -68,7 +167,7 @@ object MLUtils {
    * @param dir Directory to save the data.
    */
   def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
-    val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
+    val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" 
"))
     dataStr.saveAsTextFile(dir)
   }
 
@@ -76,44 +175,52 @@ object MLUtils {
    * Utility function to compute mean and standard deviation on a given 
dataset.
    *
    * @param data - input data set whose statistics are computed
-   * @param nfeatures - number of features
-   * @param nexamples - number of examples in input dataset
+   * @param numFeatures - number of features
+   * @param numExamples - number of examples in input dataset
    *
    * @return (yMean, xColMean, xColSd) - Tuple consisting of
    *     yMean - mean of the labels
    *     xColMean - Row vector with mean for every column (or feature) of the 
input data
    *     xColSd - Row vector standard deviation for every column (or feature) 
of the input data.
    */
-  def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
-      (Double, DoubleMatrix, DoubleMatrix) = {
-    val yMean: Double = data.map { labeledPoint => labeledPoint.label 
}.reduce(_ + _) / nexamples
-
-    // NOTE: We shuffle X by column here to compute column sum and sum of 
squares.
-    val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint 
=>
-      val nCols = labeledPoint.features.length
-      // Traverse over every column and emit (col, value, value^2)
-      Iterator.tabulate(nCols) { i =>
-        (i, (labeledPoint.features(i), 
labeledPoint.features(i)*labeledPoint.features(i)))
-      }
-    }.reduceByKey { case(x1, x2) =>
-      (x1._1 + x2._1, x1._2 + x2._2)
+  def computeStats(
+      data: RDD[LabeledPoint],
+      numFeatures: Int,
+      numExamples: Long): (Double, Vector, Vector) = {
+    val brzData = data.map { case LabeledPoint(label, features) =>
+      (label, features.toBreeze)
     }
-    val xColSumsMap = xColSumSq.collectAsMap()
-
-    val xColMean = DoubleMatrix.zeros(nfeatures, 1)
-    val xColSd = DoubleMatrix.zeros(nfeatures, 1)
-
-    // Compute mean and unbiased variance using column sums
-    var col = 0
-    while (col < nfeatures) {
-      xColMean.put(col, xColSumsMap(col)._1 / nexamples)
-      val variance =
-        (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) 
/ nexamples
-      xColSd.put(col, math.sqrt(variance))
-      col += 1
+    val aggStats = brzData.aggregate(
+      (0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
+    )(
+      seqOp = (c, v) => (c, v) match {
+        case ((n, sumLabel, sum, sumSq), (label, features)) =>
+          features.activeIterator.foreach { case (i, x) =>
+            sumSq(i) += x * x
+          }
+          (n + 1L, sumLabel + label, sum += features, sumSq)
+      },
+      combOp = (c1, c2) => (c1, c2) match {
+        case ((n1, sumLabel1, sum1, sumSq1), (n2, sumLabel2, sum2, sumSq2)) =>
+          (n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
+      }
+    )
+    val (nl, sumLabel, sum, sumSq) = aggStats
+
+    require(nl > 0, "Input data is empty.")
+    require(nl == numExamples)
+
+    val n = nl.toDouble
+    val yMean = sumLabel / n
+    val mean = sum / n
+    val std = new Array[Double](sum.length)
+    var i = 0
+    while (i < numFeatures) {
+      std(i) = sumSq(i) / n - mean(i) * mean(i)
+      i += 1
     }
 
-    (yMean, xColMean, xColSd)
+    (yMean, Vectors.fromBreeze(mean), Vectors.dense(std))
   }
 
   /**
@@ -144,6 +251,18 @@ object MLUtils {
     val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
     val normDiff = norm1 - norm2
     var sqDist = 0.0
+    /*
+     * The relative error is
+     * <pre>
+     * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
+     * </pre>
+     * which is bounded by
+     * <pre>
+     * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
+     * </pre>
+     * The bound doesn't need the inner product, so we can use it as a 
sufficient condition to
+     * check quickly whether the inner product approach is accurate.
+     */
     val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * 
normDiff + EPSILON)
     if (precisionBound1 < precision) {
       sqDist = sumSquaredNorm - 2.0 * v1.dot(v2)

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index c96c94f..e300c3d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -23,6 +23,7 @@ import org.jblas.DoubleMatrix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
@@ -58,7 +59,7 @@ object SVMDataGenerator {
       }
       val yD = new DoubleMatrix(1, x.length, x: _*).dot(trueWeights) + 
rnd.nextGaussian() * 0.1
       val y = if (yD < 0) 0.0 else 1.0
-      LabeledPoint(y, x)
+      LabeledPoint(y, Vectors.dense(x))
     }
 
     MLUtils.saveLabeledData(data, outputPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
 
b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
index 073ded6..c80b113 100644
--- 
a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.classification;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,12 +46,12 @@ public class JavaNaiveBayesSuite implements Serializable {
   }
 
   private static final List<LabeledPoint> POINTS = Arrays.asList(
-    new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}),
-    new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}),
-    new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}),
-    new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}),
-    new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}),
-    new LabeledPoint(2, new double[] {0.0, 0.0, 2.0})
+    new LabeledPoint(0, Vectors.dense(1.0, 0.0, 0.0)),
+    new LabeledPoint(0, Vectors.dense(2.0, 0.0, 0.0)),
+    new LabeledPoint(1, Vectors.dense(0.0, 1.0, 0.0)),
+    new LabeledPoint(1, Vectors.dense(0.0, 2.0, 0.0)),
+    new LabeledPoint(2, Vectors.dense(0.0, 0.0, 1.0)),
+    new LabeledPoint(2, Vectors.dense(0.0, 0.0, 2.0))
   );
 
   private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel 
model) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java 
b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
index 117e5ea..4701a5e 100644
--- 
a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.mllib.classification;
 
-
 import java.io.Serializable;
 import java.util.List;
 
@@ -28,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-
 import org.apache.spark.mllib.regression.LabeledPoint;
 
 public class JavaSVMSuite implements Serializable {
@@ -94,5 +92,4 @@ public class JavaSVMSuite implements Serializable {
     int numAccurate = validatePrediction(validationData, model);
     Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java 
b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
index 2c4d795..c6d8425 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg;
 
 import java.io.Serializable;
 
-import com.google.common.collect.Lists;
-
 import scala.Tuple2;
 
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -36,7 +36,7 @@ public class JavaVectorsSuite implements Serializable {
 
   @Test
   public void sparseArrayConstruction() {
-    Vector v = Vectors.sparse(3, Lists.newArrayList(
+    Vector v = Vectors.sparse(3, Lists.<Tuple2<Integer, Double>>newArrayList(
         new Tuple2<Integer, Double>(0, 2.0),
         new Tuple2<Integer, Double>(2, 3.0)));
     assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0);

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java 
b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
index f44b25c..f725924 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
@@ -59,7 +59,7 @@ public class JavaLassoSuite implements Serializable {
   @Test
   public void runLassoUsingConstructor() {
     int nPoints = 10000;
-    double A = 2.0;
+    double A = 0.0;
     double[] weights = {-1.5, 1.0e-2};
 
     JavaRDD<LabeledPoint> testRDD = 
sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
@@ -80,7 +80,7 @@ public class JavaLassoSuite implements Serializable {
   @Test
   public void runLassoUsingStaticMethods() {
     int nPoints = 10000;
-    double A = 2.0;
+    double A = 0.0;
     double[] weights = {-1.5, 1.0e-2};
 
     JavaRDD<LabeledPoint> testRDD = 
sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
 
b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
index 2fdd5fc..03714ae 100644
--- 
a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
@@ -55,30 +55,27 @@ public class JavaRidgeRegressionSuite implements 
Serializable {
     return errorSum / validationData.size();
   }
 
-  List<LabeledPoint> generateRidgeData(int numPoints, int nfeatures, double 
eps) {
+  List<LabeledPoint> generateRidgeData(int numPoints, int numFeatures, double 
std) {
     org.jblas.util.Random.seed(42);
     // Pick weights as random values distributed uniformly in [-0.5, 0.5]
-    DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5);
-    // Set first two weights to eps
-    w.put(0, 0, eps);
-    w.put(1, 0, eps);
-    return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, 
numPoints, 42, eps);
+    DoubleMatrix w = DoubleMatrix.rand(numFeatures, 1).subi(0.5);
+    return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, 
numPoints, 42, std);
   }
 
   @Test
   public void runRidgeRegressionUsingConstructor() {
-    int nexamples = 200;
-    int nfeatures = 20;
-    double eps = 10.0;
-    List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+    int numExamples = 50;
+    int numFeatures = 20;
+    List<LabeledPoint> data = generateRidgeData(2*numExamples, numFeatures, 
10.0);
 
-    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
-    List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, 
numExamples));
+    List<LabeledPoint> validationData = data.subList(numExamples, 2 * 
numExamples);
 
     RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD();
-    ridgeSGDImpl.optimizer().setStepSize(1.0)
-                            .setRegParam(0.0)
-                            .setNumIterations(200);
+    ridgeSGDImpl.optimizer()
+      .setStepSize(1.0)
+      .setRegParam(0.0)
+      .setNumIterations(200);
     RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd());
     double unRegularizedErr = predictionError(validationData, model);
 
@@ -91,13 +88,12 @@ public class JavaRidgeRegressionSuite implements 
Serializable {
 
   @Test
   public void runRidgeRegressionUsingStaticMethods() {
-    int nexamples = 200;
-    int nfeatures = 20;
-    double eps = 10.0;
-    List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+    int numExamples = 50;
+    int numFeatures = 20;
+    List<LabeledPoint> data = generateRidgeData(2 * numExamples, numFeatures, 
10.0);
 
-    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
-    List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, 
numExamples));
+    List<LabeledPoint> validationData = data.subList(numExamples, 2 * 
numExamples);
 
     RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 
200, 1.0, 0.0);
     double unRegularizedErr = predictionError(validationData, model);

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 05322b0..1e03c9d 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -20,11 +20,10 @@ package org.apache.spark.mllib.classification
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
 
@@ -61,7 +60,7 @@ object LogisticRegressionSuite {
       if (yVal > 0) 1 else 0
     }
 
-    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), 
Vectors.dense(Array(x1(i)))))
     testData
   }
 
@@ -113,7 +112,7 @@ class LogisticRegressionSuite extends FunSuite with 
LocalSparkContext with Shoul
     val testData = LogisticRegressionSuite.generateLogisticInput(A, B, 
nPoints, 42)
 
     val initialB = -1.0
-    val initialWeights = Array(initialB)
+    val initialWeights = Vectors.dense(initialB)
 
     val testRDD = sc.parallelize(testData, 2)
     testRDD.cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index 9dd6c79..516895d 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.mllib.classification
 
 import scala.util.Random
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.LocalSparkContext
 
@@ -54,7 +54,7 @@ object NaiveBayesSuite {
         if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
       }
 
-      LabeledPoint(y, xi)
+      LabeledPoint(y, Vectors.dense(xi))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index bc7abb5..dfacbfe 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.mllib.classification
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
 import org.jblas.DoubleMatrix
@@ -28,6 +27,7 @@ import org.jblas.DoubleMatrix
 import org.apache.spark.SparkException
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
 
 object SVMSuite {
 
@@ -54,7 +54,7 @@ object SVMSuite {
         intercept + 0.01 * rnd.nextGaussian()
       if (yD < 0) 0.0 else 1.0
     }
-    y.zip(x).map(p => LabeledPoint(p._1, p._2))
+    y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
   }
 
 }
@@ -110,7 +110,7 @@ class SVMSuite extends FunSuite with LocalSparkContext {
 
     val initialB = -1.0
     val initialC = -1.0
-    val initialWeights = Array(initialB,initialC)
+    val initialWeights = Vectors.dense(initialB, initialC)
 
     val testRDD = sc.parallelize(testData, 2)
     testRDD.cache()
@@ -150,10 +150,10 @@ class SVMSuite extends FunSuite with LocalSparkContext {
     }
 
     intercept[SparkException] {
-      val model = SVMWithSGD.train(testRDDInvalid, 100)
+      SVMWithSGD.train(testRDDInvalid, 100)
     }
 
     // Turning off data validation should not throw an exception
-    val noValidationModel = new 
SVMWithSGD().setValidateData(false).run(testRDDInvalid)
+    new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index 631d0e2..c4b4334 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.mllib.optimization
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.SparkContext
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
 
 object GradientDescentSuite {
 
@@ -58,8 +57,7 @@ object GradientDescentSuite {
       if (yVal > 0) 1 else 0
     }
 
-    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
-    testData
+    (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i))))
   }
 }
 
@@ -83,11 +81,11 @@ class GradientDescentSuite extends FunSuite with 
LocalSparkContext with ShouldMa
     // Add a extra variable consisting of all 1.0's for the intercept.
     val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42)
     val data = testData.map { case LabeledPoint(label, features) =>
-      label -> Array(1.0, features: _*)
+      label -> Vectors.dense(1.0, features.toArray: _*)
     }
 
     val dataRDD = sc.parallelize(data, 2).cache()
-    val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)
+    val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*)
 
     val (_, loss) = GradientDescent.runMiniBatchSGD(
       dataRDD,
@@ -113,13 +111,13 @@ class GradientDescentSuite extends FunSuite with 
LocalSparkContext with ShouldMa
     // Add a extra variable consisting of all 1.0's for the intercept.
     val testData = GradientDescentSuite.generateGDInput(2.0, -1.5, 10000, 42)
     val data = testData.map { case LabeledPoint(label, features) =>
-      label -> Array(1.0, features: _*)
+      label -> Vectors.dense(1.0, features.toArray: _*)
     }
 
     val dataRDD = sc.parallelize(data, 2).cache()
 
     // Prepare non-zero weights
-    val initialWeightsWithIntercept = Array(1.0, 0.5)
+    val initialWeightsWithIntercept = Vectors.dense(1.0, 0.5)
 
     val regParam0 = 0
     val (newWeights0, loss0) = GradientDescent.runMiniBatchSGD(

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index 2cebac9..6aad9eb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class LassoSuite extends FunSuite with LocalSparkContext {
@@ -33,29 +34,33 @@ class LassoSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Lasso local random SGD") {
-    val nPoints = 10000
+    val nPoints = 1000
 
     val A = 2.0
     val B = -1.5
     val C = 1.0e-2
 
-    val testData = LinearDataGenerator.generateLinearInput(A, 
Array[Double](B,C), nPoints, 42)
-
-    val testRDD = sc.parallelize(testData, 2)
-    testRDD.cache()
+    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, 
C), nPoints, 42)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
+    val testRDD = sc.parallelize(testData, 2).cache()
 
     val ls = new LassoWithSGD()
-    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
 
     val model = ls.run(testRDD)
-
     val weight0 = model.weights(0)
     val weight1 = model.weights(1)
-    assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + 
" not in [1.9, 2.1]")
-    assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, 
-1.4]")
-    assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in 
[-0.001, 0.001]")
+    val weight2 = model.weights(2)
+    assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+    assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, 
-1.4]")
+    assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in 
[-0.001, 0.001]")
 
     val validationData = LinearDataGenerator.generateLinearInput(A, 
Array[Double](B,C), nPoints, 17)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
     val validationRDD  = sc.parallelize(validationData, 2)
 
     // Test prediction on RDD.
@@ -66,33 +71,39 @@ class LassoSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Lasso local random SGD with initial weights") {
-    val nPoints = 10000
+    val nPoints = 1000
 
     val A = 2.0
     val B = -1.5
     val C = 1.0e-2
 
-    val testData = LinearDataGenerator.generateLinearInput(A, 
Array[Double](B,C), nPoints, 42)
+    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, 
C), nPoints, 42)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
 
+    val initialA = -1.0
     val initialB = -1.0
     val initialC = -1.0
-    val initialWeights = Array(initialB,initialC)
+    val initialWeights = Vectors.dense(initialA, initialB, initialC)
 
-    val testRDD = sc.parallelize(testData, 2)
-    testRDD.cache()
+    val testRDD = sc.parallelize(testData, 2).cache()
 
     val ls = new LassoWithSGD()
-    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
 
     val model = ls.run(testRDD, initialWeights)
-
     val weight0 = model.weights(0)
     val weight1 = model.weights(1)
-    assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + 
" not in [1.9, 2.1]")
-    assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, 
-1.4]")
-    assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in 
[-0.001, 0.001]")
+    val weight2 = model.weights(2)
+    assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+    assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, 
-1.4]")
+    assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in 
[-0.001, 0.001]")
 
     val validationData = LinearDataGenerator.generateLinearInput(A, 
Array[Double](B,C), nPoints, 17)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
     val validationRDD  = sc.parallelize(validationData,2)
 
     // Test prediction on RDD.

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 5d251bc..2f7d307 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class LinearRegressionSuite extends FunSuite with LocalSparkContext {
@@ -40,11 +41,12 @@ class LinearRegressionSuite extends FunSuite with 
LocalSparkContext {
     linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
 
     val model = linReg.run(testRDD)
-
     assert(model.intercept >= 2.5 && model.intercept <= 3.5)
-    assert(model.weights.length === 2)
-    assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
-    assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+    val weights = model.weights
+    assert(weights.size === 2)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(1) >= 9.0 && weights(1) <= 11.0)
 
     val validationData = LinearDataGenerator.generateLinearInput(
       3.0, Array(10.0, 10.0), 100, 17)
@@ -67,9 +69,11 @@ class LinearRegressionSuite extends FunSuite with 
LocalSparkContext {
     val model = linReg.run(testRDD)
 
     assert(model.intercept === 0.0)
-    assert(model.weights.length === 2)
-    assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
-    assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+    val weights = model.weights
+    assert(weights.size === 2)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(1) >= 9.0 && weights(1) <= 11.0)
 
     val validationData = LinearDataGenerator.generateLinearInput(
       0.0, Array(10.0, 10.0), 100, 17)
@@ -81,4 +85,40 @@ class LinearRegressionSuite extends FunSuite with 
LocalSparkContext {
     // Test prediction on Array.
     validatePrediction(validationData.map(row => model.predict(row.features)), 
validationData)
   }
+
+  // Test if we can correctly learn Y = 10*X1 + 10*X10000
+  test("sparse linear regression without intercept") {
+    val denseRDD = sc.parallelize(
+      LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 
42), 2)
+    val sparseRDD = denseRDD.map { case LabeledPoint(label, v) =>
+      val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+      LabeledPoint(label, sv)
+    }.cache()
+    val linReg = new LinearRegressionWithSGD().setIntercept(false)
+    linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+    val model = linReg.run(sparseRDD)
+
+    assert(model.intercept === 0.0)
+
+    val weights = model.weights
+    assert(weights.size === 10000)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(9999) >= 9.0 && weights(9999) <= 11.0)
+
+    val validationData = LinearDataGenerator.generateLinearInput(0.0, 
Array(10.0, 10.0), 100, 17)
+    val sparseValidationData = validationData.map { case LabeledPoint(label, 
v) =>
+      val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+      LabeledPoint(label, sv)
+    }
+    val sparseValidationRDD = sc.parallelize(sparseValidationData, 2)
+
+      // Test prediction on RDD.
+    validatePrediction(
+      model.predict(sparseValidationRDD.map(_.features)).collect(), 
sparseValidationData)
+
+    // Test prediction on Array.
+    validatePrediction(
+      sparseValidationData.map(row => model.predict(row.features)), 
sparseValidationData)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index b2044ed..f66fc6e 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.mllib.regression
 
-import org.jblas.DoubleMatrix
 import org.scalatest.FunSuite
 
+import org.jblas.DoubleMatrix
+
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
@@ -30,22 +31,22 @@ class RidgeRegressionSuite extends FunSuite with 
LocalSparkContext {
     }.reduceLeft(_ + _) / predictions.size
   }
 
-  test("regularization with skewed weights") {
-    val nexamples = 200
-    val nfeatures = 20
-    val eps = 10
+  test("ridge regression can help avoid overfitting") {
+
+    // For small number of examples and large variance of error distribution,
+    // ridge regression should give smaller generalization error that linear 
regression.
+
+    val numExamples = 50
+    val numFeatures = 20
 
     org.jblas.util.Random.seed(42)
     // Pick weights as random values distributed uniformly in [-0.5, 0.5]
-    val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
-    // Set first two weights to eps
-    w.put(0, 0, eps)
-    w.put(1, 0, eps)
+    val w = DoubleMatrix.rand(numFeatures, 1).subi(0.5)
 
     // Use half of data for training and other half for validation
-    val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 
2*nexamples, 42, eps)
-    val testData = data.take(nexamples)
-    val validationData = data.takeRight(nexamples)
+    val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2 * 
numExamples, 42, 10.0)
+    val testData = data.take(numExamples)
+    val validationData = data.takeRight(numExamples)
 
     val testRDD = sc.parallelize(testData, 2).cache()
     val validationRDD = sc.parallelize(validationData, 2).cache()
@@ -67,7 +68,7 @@ class RidgeRegressionSuite extends FunSuite with 
LocalSparkContext {
     val ridgeErr = predictionError(
         ridgeModel.predict(validationRDD.map(_.features)).collect(), 
validationData)
 
-    // Ridge CV-error should be lower than linear regression
+    // Ridge validation error should be lower than linear regression.
     assert(ridgeErr < linearErr,
       "ridgeError (" + ridgeErr + ") was not less than linearError(" + 
linearErr + ")")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
index 4349c70..350130c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.mllib.tree.model.Filter
 import org.apache.spark.mllib.tree.configuration.Strategy
 import org.apache.spark.mllib.tree.configuration.Algo._
 import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vectors
 
 class DecisionTreeSuite extends FunSuite with BeforeAndAfterAll {
 
@@ -396,7 +397,7 @@ object DecisionTreeSuite {
   def generateOrderedLabeledPointsWithLabel0(): Array[LabeledPoint] = {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
-      val lp = new LabeledPoint(0.0,Array(i.toDouble,1000.0-i))
+      val lp = new LabeledPoint(0.0, Vectors.dense(i.toDouble, 1000.0 - i))
       arr(i) = lp
     }
     arr
@@ -405,7 +406,7 @@ object DecisionTreeSuite {
   def generateOrderedLabeledPointsWithLabel1(): Array[LabeledPoint] = {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
-      val lp = new LabeledPoint(1.0,Array(i.toDouble,999.0-i))
+      val lp = new LabeledPoint(1.0, Vectors.dense(i.toDouble, 999.0 - i))
       arr(i) = lp
     }
     arr
@@ -415,9 +416,9 @@ object DecisionTreeSuite {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
       if (i < 600){
-        arr(i) = new LabeledPoint(1.0,Array(0.0,1.0))
+        arr(i) = new LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
       } else {
-        arr(i) = new LabeledPoint(0.0,Array(1.0,0.0))
+        arr(i) = new LabeledPoint(0.0, Vectors.dense(1.0, 0.0))
       }
     }
     arr

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 60f053b..27d41c7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.mllib.util
 
+import java.io.File
+
 import org.scalatest.FunSuite
 
 import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => 
breezeNorm,
   squaredDistance => breezeSquaredDistance}
+import com.google.common.base.Charsets
+import com.google.common.io.Files
 
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.MLUtils._
 
-class MLUtilsSuite extends FunSuite {
+class MLUtilsSuite extends FunSuite with LocalSparkContext {
 
   test("epsilon computation") {
     assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
@@ -49,4 +55,55 @@ class MLUtilsSuite extends FunSuite {
       assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, 
s"failed with m = $m")
     }
   }
+
+  test("compute stats") {
+    val data = Seq.fill(3)(Seq(
+      LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)),
+      LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0))
+    )).flatten
+    val rdd = sc.parallelize(data, 2)
+    val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6)
+    assert(meanLabel === 0.5)
+    assert(mean === Vectors.dense(2.0, 3.0, 4.0))
+    assert(std === Vectors.dense(1.0, 1.0, 1.0))
+  }
+
+  test("loadLibSVMData") {
+    val lines =
+      """
+        |+1 1:1.0 3:2.0 5:3.0
+        |-1
+        |-1 2:4.0 4:5.0 6:6.0
+      """.stripMargin
+    val tempDir = Files.createTempDir()
+    val file = new File(tempDir.getPath, "part-00000")
+    Files.write(lines, file, Charsets.US_ASCII)
+    val path = tempDir.toURI.toString
+
+    val pointsWithNumFeatures = MLUtils.loadLibSVMData(sc, path, 6).collect()
+    val pointsWithoutNumFeatures = MLUtils.loadLibSVMData(sc, path).collect()
+
+    for (points <- Seq(pointsWithNumFeatures, pointsWithoutNumFeatures)) {
+      assert(points.length === 3)
+      assert(points(0).label === 1.0)
+      assert(points(0).features === Vectors.sparse(6, Seq((0, 1.0), (2, 2.0), 
(4, 3.0))))
+      assert(points(1).label == 0.0)
+      assert(points(1).features == Vectors.sparse(6, Seq()))
+      assert(points(2).label === 0.0)
+      assert(points(2).features === Vectors.sparse(6, Seq((1, 4.0), (3, 5.0), 
(5, 6.0))))
+    }
+
+    val multiclassPoints = MLUtils.loadLibSVMData(sc, path, 
MLUtils.multiclassLabelParser).collect()
+    assert(multiclassPoints.length === 3)
+    assert(multiclassPoints(0).label === 1.0)
+    assert(multiclassPoints(1).label === -1.0)
+    assert(multiclassPoints(2).label === -1.0)
+
+    try {
+      file.delete()
+      tempDir.delete()
+    } catch {
+      case t: Throwable =>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 19b90df..d2f9cdb 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -87,18 +87,19 @@ class NaiveBayesModel(object):
     >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 
0.0]).reshape(3,3)
     >>> model = NaiveBayes.train(sc.parallelize(data))
     >>> model.predict(array([0.0, 1.0]))
-    0
+    0.0
     >>> model.predict(array([1.0, 0.0]))
-    1
+    1.0
     """
 
-    def __init__(self, pi, theta):
+    def __init__(self, labels, pi, theta):
+        self.labels = labels
         self.pi = pi
         self.theta = theta
 
     def predict(self, x):
         """Return the most likely class for a data vector x"""
-        return numpy.argmax(self.pi + dot(x, self.theta))
+        return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
 
 class NaiveBayes(object):
     @classmethod
@@ -122,7 +123,8 @@ class NaiveBayes(object):
         ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, 
lambda_)
         return NaiveBayesModel(
             _deserialize_double_vector(ans[0]),
-            _deserialize_double_matrix(ans[1]))
+            _deserialize_double_vector(ans[1]),
+            _deserialize_double_matrix(ans[2]))
 
 
 def _test():

Reply via email to