Repository: spark
Updated Branches:
  refs/heads/master cde548388 -> 86ef4cfd4


[SPARK-7262] [ML] Binary LogisticRegression with L1/L2 (elastic net) using 
OWLQN in new ML package

1) Handle scaling and addBias internally.
2) L1/L2 elasticnet using OWLQN optimizer.

Author: DB Tsai <d...@netflix.com>

Closes #5967 from dbtsai/lor and squashes the following commits:

fa029bb [DB Tsai] made the bound smaller
0806002 [DB Tsai] better initial intercept and more test
5c31824 [DB Tsai] fix import
c387e25 [DB Tsai] Merge branch 'master' into lor
c84e931 [DB Tsai] Made MultiClassSummarizer private
f98e711 [DB Tsai] address feedback
a784321 [DB Tsai] fix style
8ec65d2 [DB Tsai] remove new line
f3f8c88 [DB Tsai] add more tests and they match R which is good. fix a bug
34705bc [DB Tsai] first commit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86ef4cfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86ef4cfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86ef4cfd

Branch: refs/heads/master
Commit: 86ef4cfd436867d88bdc211f76d6ea668d474558
Parents: cde5483
Author: DB Tsai <d...@netflix.com>
Authored: Fri May 8 21:43:05 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri May 8 21:43:05 2015 -0700

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 436 ++++++++++++++++++-
 .../spark/ml/regression/LinearRegression.scala  |  18 +-
 .../org/apache/spark/mllib/util/MLUtils.scala   |   2 +-
 .../LogisticRegressionSuite.scala               | 395 ++++++++++++++++-
 .../LogisticRegressionSuite.scala               |  10 +-
 5 files changed, 821 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/86ef4cfd/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 550369d..647226a 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -17,22 +17,31 @@
 
 package org.apache.spark.ml.classification
 
+import scala.collection.mutable
+
+import breeze.linalg.{norm => brzNorm, DenseVector => BDV}
+import breeze.optimize.{LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN}
+import breeze.optimize.{CachedDiffFunction, DiffFunction}
+
 import org.apache.spark.annotation.AlphaComponent
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
-import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
 import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.BLAS._
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.{SparkException, Logging}
 
 /**
  * Params for logistic regression.
  */
 private[classification] trait LogisticRegressionParams extends 
ProbabilisticClassifierParams
-  with HasRegParam with HasMaxIter with HasFitIntercept with HasThreshold {
-
-  setDefault(regParam -> 0.1, maxIter -> 100, threshold -> 0.5)
-}
+  with HasRegParam with HasElasticNetParam with HasMaxIter with 
HasFitIntercept with HasTol
+  with HasThreshold
 
 /**
  * :: AlphaComponent ::
@@ -43,45 +52,171 @@ private[classification] trait LogisticRegressionParams 
extends ProbabilisticClas
 @AlphaComponent
 class LogisticRegression
   extends ProbabilisticClassifier[Vector, LogisticRegression, 
LogisticRegressionModel]
-  with LogisticRegressionParams {
+  with LogisticRegressionParams with Logging {
 
-  /** @group setParam */
+  /**
+   * Set the regularization parameter.
+   * Default is 0.0.
+   * @group setParam
+   */
   def setRegParam(value: Double): this.type = set(regParam, value)
+  setDefault(regParam -> 0.0)
 
-  /** @group setParam */
+  /**
+   * Set the ElasticNet mixing parameter.
+   * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 
penalty.
+   * For 0 < alpha < 1, the penalty is a combination of L1 and L2.
+   * Default is 0.0 which is an L2 penalty.
+   * @group setParam
+   */
+  def setElasticNetParam(value: Double): this.type = set(elasticNetParam, 
value)
+  setDefault(elasticNetParam -> 0.0)
+
+  /**
+   * Set the maximal number of iterations.
+   * Default is 100.
+   * @group setParam
+   */
   def setMaxIter(value: Int): this.type = set(maxIter, value)
+  setDefault(maxIter -> 100)
+
+  /**
+   * Set the convergence tolerance of iterations.
+   * Smaller value will lead to higher accuracy with the cost of more 
iterations.
+   * Default is 1E-6.
+   * @group setParam
+   */
+  def setTol(value: Double): this.type = set(tol, value)
+  setDefault(tol -> 1E-6)
 
   /** @group setParam */
   def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value)
+  setDefault(fitIntercept -> true)
 
   /** @group setParam */
   def setThreshold(value: Double): this.type = set(threshold, value)
+  setDefault(threshold -> 0.5)
 
   override protected def train(dataset: DataFrame): LogisticRegressionModel = {
     // Extract columns from data.  If dataset is persisted, do not persist 
oldDataset.
-    val oldDataset = extractLabeledPoints(dataset)
+    val instances = extractLabeledPoints(dataset).map {
+      case LabeledPoint(label: Double, features: Vector) => (label, features)
+    }
     val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      oldDataset.persist(StorageLevel.MEMORY_AND_DISK)
+    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+
+    val (summarizer, labelSummarizer) = instances.treeAggregate(
+      (new MultivariateOnlineSummarizer, new MultiClassSummarizer))(
+        seqOp = (c, v) => (c, v) match {
+          case ((summarizer: MultivariateOnlineSummarizer, labelSummarizer: 
MultiClassSummarizer),
+          (label: Double, features: Vector)) =>
+            (summarizer.add(features), labelSummarizer.add(label))
+      },
+        combOp = (c1, c2) => (c1, c2) match {
+          case ((summarizer1: MultivariateOnlineSummarizer,
+          classSummarizer1: MultiClassSummarizer), (summarizer2: 
MultivariateOnlineSummarizer,
+          classSummarizer2: MultiClassSummarizer)) =>
+            (summarizer1.merge(summarizer2), 
classSummarizer1.merge(classSummarizer2))
+      })
+
+    val histogram = labelSummarizer.histogram
+    val numInvalid = labelSummarizer.countInvalid
+    val numClasses = histogram.length
+    val numFeatures = summarizer.mean.size
+
+    if (numInvalid != 0) {
+      val msg = s"Classification labels should be in {0 to ${numClasses - 1} " 
+
+        s"Found $numInvalid invalid labels."
+      logError(msg)
+      throw new SparkException(msg)
+    }
+
+    if (numClasses > 2) {
+      val msg = s"Currently, LogisticRegression with ElasticNet in ML package 
only supports " +
+        s"binary classification. Found $numClasses in the input dataset."
+      logError(msg)
+      throw new SparkException(msg)
     }
 
-    // Train model
-    val lr = new LogisticRegressionWithLBFGS()
-      .setIntercept($(fitIntercept))
-    lr.optimizer
-      .setRegParam($(regParam))
-      .setNumIterations($(maxIter))
-    val oldModel = lr.run(oldDataset)
-    val lrm = new LogisticRegressionModel(this, oldModel.weights, 
oldModel.intercept)
+    val featuresMean = summarizer.mean.toArray
+    val featuresStd = summarizer.variance.toArray.map(math.sqrt)
 
-    if (handlePersistence) {
-      oldDataset.unpersist()
+    val regParamL1 = $(elasticNetParam) * $(regParam)
+    val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
+
+    val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
+      featuresStd, featuresMean, regParamL2)
+
+    val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
+      new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
+    } else {
+      // Remove the L1 penalization on the intercept
+      def regParamL1Fun = (index: Int) => {
+        if (index == numFeatures) 0.0 else regParamL1
+      }
+      new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
     }
-    copyValues(lrm)
+
+    val initialWeightsWithIntercept =
+      Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)
+
+    if ($(fitIntercept)) {
+      /**
+       * For binary logistic regression, when we initialize the weights as 
zeros,
+       * it will converge faster if we initialize the intercept such that
+       * it follows the distribution of the labels.
+       *
+       * {{{
+       * P(0) = 1 / (1 + \exp(b)), and
+       * P(1) = \exp(b) / (1 + \exp(b))
+       * }}}, hence
+       * {{{
+       * b = \log{P(1) / P(0)} = \log{count_1 / count_0}
+       * }}}
+       */
+      initialWeightsWithIntercept.toArray(numFeatures)
+        = Math.log(histogram(1).toDouble / histogram(0).toDouble)
+    }
+
+    val states = optimizer.iterations(new CachedDiffFunction(costFun),
+      initialWeightsWithIntercept.toBreeze.toDenseVector)
+
+    var state = states.next()
+    val lossHistory = mutable.ArrayBuilder.make[Double]
+
+    while (states.hasNext) {
+      lossHistory += state.value
+      state = states.next()
+    }
+    lossHistory += state.value
+
+    // The weights are trained in the scaled space; we're converting them back 
to
+    // the original space.
+    val weightsWithIntercept = {
+      val rawWeights = state.x.toArray.clone()
+      var i = 0
+      // Note that the intercept in scaled space and original space is the 
same;
+      // as a result, no scaling is needed.
+      while (i < numFeatures) {
+        rawWeights(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) 
else 0.0 }
+        i += 1
+      }
+      Vectors.dense(rawWeights)
+    }
+
+    if (handlePersistence) instances.unpersist()
+
+    val (weights, intercept) = if ($(fitIntercept)) {
+      (Vectors.dense(weightsWithIntercept.toArray.slice(0, 
weightsWithIntercept.size - 1)),
+        weightsWithIntercept(weightsWithIntercept.size - 1))
+    } else {
+      (weightsWithIntercept, 0.0)
+    }
+
+    new LogisticRegressionModel(this, weights.compressed, intercept)
   }
 }
 
-
 /**
  * :: AlphaComponent ::
  *
@@ -159,3 +294,258 @@ class LogisticRegressionModel private[ml] (
     if (probability(1) > getThreshold) 1 else 0
   }
 }
+
+/**
+ * MultiClassSummarizer computes the number of distinct labels and 
corresponding counts,
+ * and validates the data to see if the labels used for k class multi-label 
classification
+ * are in the range of {0, 1, ..., k - 1} in a online fashion.
+ *
+ * Two MultilabelSummarizer can be merged together to have a statistical 
summary of the
+ * corresponding joint dataset.
+ */
+private[classification] class MultiClassSummarizer extends Serializable {
+  private val distinctMap = new mutable.HashMap[Int, Long]
+  private var totalInvalidCnt: Long = 0L
+
+  /**
+   * Add a new label into this MultilabelSummarizer, and update the distinct 
map.
+   * @param label The label for this data point.
+   * @return This MultilabelSummarizer
+   */
+  def add(label: Double): this.type = {
+    if (label - label.toInt != 0.0 || label < 0) {
+      totalInvalidCnt += 1
+      this
+    }
+    else {
+      val counts: Long = distinctMap.getOrElse(label.toInt, 0L)
+      distinctMap.put(label.toInt, counts + 1)
+      this
+    }
+  }
+
+  /**
+   * Merge another MultilabelSummarizer, and update the distinct map.
+   * (Note that it will merge the smaller distinct map into the larger one 
using in-place
+   * merging, so either `this` or `other` object will be modified and 
returned.)
+   *
+   * @param other The other MultilabelSummarizer to be merged.
+   * @return Merged MultilabelSummarizer object.
+   */
+  def merge(other: MultiClassSummarizer): MultiClassSummarizer = {
+    val (largeMap, smallMap) = if (this.distinctMap.size > 
other.distinctMap.size) {
+      (this, other)
+    } else {
+      (other, this)
+    }
+    smallMap.distinctMap.foreach {
+      case (key, value) =>
+        val counts = largeMap.distinctMap.getOrElse(key, 0L)
+        largeMap.distinctMap.put(key, counts + value)
+    }
+    largeMap.totalInvalidCnt += smallMap.totalInvalidCnt
+    largeMap
+  }
+
+  /** @return The total invalid input counts. */
+  def countInvalid: Long = totalInvalidCnt
+
+  /** @return The number of distinct labels in the input dataset. */
+  def numClasses: Int = distinctMap.keySet.max + 1
+
+  /** @return The counts of each label in the input dataset. */
+  def histogram: Array[Long] = {
+    val result = Array.ofDim[Long](numClasses)
+    var i = 0
+    while (i < result.length) {
+      result(i) = distinctMap.getOrElse(i, 0L)
+      i += 1
+    }
+    result
+  }
+}
+
+/**
+ * LogisticAggregator computes the gradient and loss for binary logistic loss 
function, as used
+ * in binary classification for samples in sparse or dense vector in a online 
fashion.
+ *
+ * Note that multinomial logistic loss is not supported yet!
+ *
+ * Two LogisticAggregator can be merged together to have a summary of loss and 
gradient of
+ * the corresponding joint dataset.
+ *
+ * @param weights The weights/coefficients corresponding to the features.
+ * @param numClasses the number of possible outcomes for k classes 
classification problem in
+ *                   Multinomial Logistic Regression.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param featuresStd The standard deviation values of the features.
+ * @param featuresMean The mean values of the features.
+ */
+private class LogisticAggregator(
+    weights: Vector,
+    numClasses: Int,
+    fitIntercept: Boolean,
+    featuresStd: Array[Double],
+    featuresMean: Array[Double]) extends Serializable {
+
+  private var totalCnt: Long = 0L
+  private var lossSum = 0.0
+
+  private val weightsArray = weights match {
+    case dv: DenseVector => dv.values
+    case _ =>
+      throw new IllegalArgumentException(
+        s"weights only supports dense vector but got type 
${weights.getClass}.")
+  }
+
+  private val dim = if (fitIntercept) weightsArray.length - 1 else 
weightsArray.length
+
+  private val gradientSumArray = Array.ofDim[Double](weightsArray.length)
+
+  /**
+   * Add a new training data to this LogisticAggregator, and update the loss 
and gradient
+   * of the objective function.
+   *
+   * @param label The label for this data point.
+   * @param data The features for one data point in dense/sparse vector format 
to be added
+   *             into this aggregator.
+   * @return This LogisticAggregator object.
+   */
+  def add(label: Double, data: Vector): this.type = {
+    require(dim == data.size, s"Dimensions mismatch when adding new sample." +
+      s" Expecting $dim but got ${data.size}.")
+
+    val dataSize = data.size
+
+    val localWeightsArray = weightsArray
+    val localGradientSumArray = gradientSumArray
+
+    numClasses match {
+      case 2 =>
+        /**
+         * For Binary Logistic Regression.
+         */
+        val margin = - {
+          var sum = 0.0
+          data.foreachActive { (index, value) =>
+            if (featuresStd(index) != 0.0 && value != 0.0) {
+              sum += localWeightsArray(index) * (value / featuresStd(index))
+            }
+          }
+          sum + { if (fitIntercept) localWeightsArray(dim) else 0.0 }
+        }
+
+        val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+        data.foreachActive { (index, value) =>
+          if (featuresStd(index) != 0.0 && value != 0.0) {
+            localGradientSumArray(index) += multiplier * (value / 
featuresStd(index))
+          }
+        }
+
+        if (fitIntercept) {
+          localGradientSumArray(dim) += multiplier
+        }
+
+        if (label > 0) {
+          // The following is equivalent to log(1 + exp(margin)) but more 
numerically stable.
+          lossSum += MLUtils.log1pExp(margin)
+        } else {
+          lossSum += MLUtils.log1pExp(margin) - margin
+        }
+      case _ =>
+        new NotImplementedError("LogisticRegression with ElasticNet in ML 
package only supports " +
+          "binary classification for now.")
+    }
+    totalCnt += 1
+    this
+  }
+
+  /**
+   * Merge another LogisticAggregator, and update the loss and gradient
+   * of the objective function.
+   * (Note that it's in place merging; as a result, `this` object will be 
modified.)
+   *
+   * @param other The other LogisticAggregator to be merged.
+   * @return This LogisticAggregator object.
+   */
+  def merge(other: LogisticAggregator): this.type = {
+    require(dim == other.dim, s"Dimensions mismatch when merging with another 
" +
+      s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
+
+    if (other.totalCnt != 0) {
+      totalCnt += other.totalCnt
+      lossSum += other.lossSum
+
+      var i = 0
+      val localThisGradientSumArray = this.gradientSumArray
+      val localOtherGradientSumArray = other.gradientSumArray
+      while (i < localThisGradientSumArray.length) {
+        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
+        i += 1
+      }
+    }
+    this
+  }
+
+  def count: Long = totalCnt
+
+  def loss: Double = lossSum / totalCnt
+
+  def gradient: Vector = {
+    val result = Vectors.dense(gradientSumArray.clone())
+    scal(1.0 / totalCnt, result)
+    result
+  }
+}
+
+/**
+ * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial 
logistic loss function,
+ * as used in multi-class classification (it is also used in binary logistic 
regression).
+ * It returns the loss and gradient with L2 regularization at a particular 
point (weights).
+ * It's used in Breeze's convex optimization routines.
+ */
+private class LogisticCostFun(
+    data: RDD[(Double, Vector)],
+    numClasses: Int,
+    fitIntercept: Boolean,
+    featuresStd: Array[Double],
+    featuresMean: Array[Double],
+    regParamL2: Double) extends DiffFunction[BDV[Double]] {
+
+  override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
+    val w = Vectors.fromBreeze(weights)
+
+    val logisticAggregator = data.treeAggregate(new LogisticAggregator(w, 
numClasses, fitIntercept,
+      featuresStd, featuresMean))(
+        seqOp = (c, v) => (c, v) match {
+          case (aggregator, (label, features)) => aggregator.add(label, 
features)
+        },
+        combOp = (c1, c2) => (c1, c2) match {
+          case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
+        })
+
+    // regVal is the sum of weight squares for L2 regularization
+    val norm = if (regParamL2 == 0.0) {
+      0.0
+    } else if (fitIntercept) {
+      brzNorm(Vectors.dense(weights.toArray.slice(0, weights.size 
-1)).toBreeze, 2.0)
+    } else {
+      brzNorm(weights, 2.0)
+    }
+    val regVal = 0.5 * regParamL2 * norm * norm
+
+    val loss = logisticAggregator.loss + regVal
+    val gradient = logisticAggregator.gradient
+
+    if (fitIntercept) {
+      val wArray = w.toArray.clone()
+      wArray(wArray.length - 1) = 0.0
+      axpy(regParamL2, Vectors.dense(wArray), gradient)
+    } else {
+      axpy(regParamL2, w, gradient)
+    }
+
+    (loss, gradient.toBreeze.asInstanceOf[BDV[Double]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/86ef4cfd/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index e63c9a3..6377923 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -106,14 +106,16 @@ class LinearRegression extends Regressor[Vector, 
LinearRegression, LinearRegress
     if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
 
     val (summarizer, statCounter) = instances.treeAggregate(
-      (new MultivariateOnlineSummarizer, new StatCounter))( {
-        case ((summarizer: MultivariateOnlineSummarizer, statCounter: 
StatCounter),
-        (label: Double, features: Vector)) =>
-          (summarizer.add(features), statCounter.merge(label))
-      }, {
-        case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: 
StatCounter),
-        (summarizer2: MultivariateOnlineSummarizer, statCounter2: 
StatCounter)) =>
-          (summarizer1.merge(summarizer2), statCounter1.merge(statCounter2))
+      (new MultivariateOnlineSummarizer, new StatCounter))(
+        seqOp = (c, v) => (c, v) match {
+          case ((summarizer: MultivariateOnlineSummarizer, statCounter: 
StatCounter),
+          (label: Double, features: Vector)) =>
+            (summarizer.add(features), statCounter.merge(label))
+      },
+        combOp = (c1, c2) => (c1, c2) match {
+          case ((summarizer1: MultivariateOnlineSummarizer, statCounter1: 
StatCounter),
+          (summarizer2: MultivariateOnlineSummarizer, statCounter2: 
StatCounter)) =>
+            (summarizer1.merge(summarizer2), statCounter1.merge(statCounter2))
       })
 
     val numFeatures = summarizer.mean.size

http://git-wip-us.apache.org/repos/asf/spark/blob/86ef4cfd/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 5d6ddd4..681f4c6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -331,7 +331,7 @@ object MLUtils {
    * @param x a floating-point value as input.
    * @return the result of `math.log(1 + math.exp(x))`.
    */
-  private[mllib] def log1pExp(x: Double): Double = {
+  private[spark] def log1pExp(x: Double): Double = {
     if (x > 0) {
       x + math.log1p(math.exp(-x))
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/86ef4cfd/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 6dd1fdf..78cdd47 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ml.classification
 
 import org.scalatest.FunSuite
 
-import 
org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInput
+import org.apache.spark.mllib.classification.LogisticRegressionSuite
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
@@ -30,13 +30,42 @@ class LogisticRegressionSuite extends FunSuite with 
MLlibTestSparkContext {
 
   @transient var sqlContext: SQLContext = _
   @transient var dataset: DataFrame = _
+  @transient var binaryDataset: DataFrame = _
   private val eps: Double = 1e-5
 
   override def beforeAll(): Unit = {
     super.beforeAll()
     sqlContext = new SQLContext(sc)
-    dataset = sqlContext.createDataFrame(
-      sc.parallelize(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 
42), 2))
+
+    dataset = sqlContext.createDataFrame(sc.parallelize(LogisticRegressionSuite
+      .generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42), 4))
+
+    /**
+     * Here is the instruction describing how to export the test data into CSV 
format
+     * so we can validate the training accuracy compared with R's glmnet 
package.
+     *
+     * import org.apache.spark.mllib.classification.LogisticRegressionSuite
+     * val nPoints = 10000
+     * val weights = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191)
+     * val xMean = Array(5.843, 3.057, 3.758, 1.199)
+     * val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
+     * val data = 
sc.parallelize(LogisticRegressionSuite.generateMultinomialLogisticInput(
+     *   weights, xMean, xVariance, true, nPoints, 42), 1)
+     * data.map(x=> x.label + ", " + x.features(0) + ", " + x.features(1) + ", 
"
+     *   + x.features(2) + ", " + x.features(3)).saveAsTextFile("path")
+     */
+    binaryDataset = {
+      val nPoints = 10000
+      val weights = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191)
+      val xMean = Array(5.843, 3.057, 3.758, 1.199)
+      val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
+
+      val testData = LogisticRegressionSuite.generateMultinomialLogisticInput(
+        weights, xMean, xVariance, true, nPoints, 42)
+
+      sqlContext.createDataFrame(sc.parallelize(LogisticRegressionSuite
+        .generateMultinomialLogisticInput(weights, xMean, xVariance, true, 
nPoints, 42), 4))
+    }
   }
 
   test("logistic regression: default params") {
@@ -135,4 +164,364 @@ class LogisticRegressionSuite extends FunSuite with 
MLlibTestSparkContext {
         assert(pred == predFromProb)
     }
   }
+
+  test("MultiClassSummarizer") {
+    val summarizer1 = (new MultiClassSummarizer)
+      .add(0.0).add(3.0).add(4.0).add(3.0).add(6.0)
+    assert(summarizer1.histogram.zip(Array[Long](1, 0, 0, 2, 1, 0, 
1)).forall(x => x._1 === x._2))
+    assert(summarizer1.countInvalid === 0)
+    assert(summarizer1.numClasses === 7)
+
+    val summarizer2 = (new MultiClassSummarizer)
+      .add(1.0).add(5.0).add(3.0).add(0.0).add(4.0).add(1.0)
+    assert(summarizer2.histogram.zip(Array[Long](1, 2, 0, 1, 1, 1)).forall(x 
=> x._1 === x._2))
+    assert(summarizer2.countInvalid === 0)
+    assert(summarizer2.numClasses === 6)
+
+    val summarizer3 = (new MultiClassSummarizer)
+      
.add(0.0).add(1.3).add(5.2).add(2.5).add(2.0).add(4.0).add(4.0).add(4.0).add(1.0)
+    assert(summarizer3.histogram.zip(Array[Long](1, 1, 1, 0, 3)).forall(x => 
x._1 === x._2))
+    assert(summarizer3.countInvalid === 3)
+    assert(summarizer3.numClasses === 5)
+
+    val summarizer4 = (new MultiClassSummarizer)
+      .add(3.1).add(4.3).add(2.0).add(1.0).add(3.0)
+    assert(summarizer4.histogram.zip(Array[Long](0, 1, 1, 1)).forall(x => x._1 
=== x._2))
+    assert(summarizer4.countInvalid === 2)
+    assert(summarizer4.numClasses === 4)
+
+    // small map merges large one
+    val summarizerA = summarizer1.merge(summarizer2)
+    assert(summarizerA.hashCode() === summarizer2.hashCode())
+    assert(summarizerA.histogram.zip(Array[Long](2, 2, 0, 3, 2, 1, 
1)).forall(x => x._1 === x._2))
+    assert(summarizerA.countInvalid === 0)
+    assert(summarizerA.numClasses === 7)
+
+    // large map merges small one
+    val summarizerB = summarizer3.merge(summarizer4)
+    assert(summarizerB.hashCode() === summarizer3.hashCode())
+    assert(summarizerB.histogram.zip(Array[Long](1, 2, 2, 1, 3)).forall(x => 
x._1 === x._2))
+    assert(summarizerB.countInvalid === 5)
+    assert(summarizerB.numClasses === 5)
+  }
+
+  test("binary logistic regression with intercept without regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(true)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 0, 
lambda = 0))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                     s0
+     * (Intercept)  2.8366423
+     * data.V2     -0.5895848
+     * data.V3      0.8931147
+     * data.V4     -0.3925051
+     * data.V5     -0.7996864
+     */
+    val interceptR = 2.8366423
+    val weightsR = Array(-0.5895848, 0.8931147, -0.3925051, -0.7996864)
+
+    assert(model.intercept ~== interceptR relTol 1E-3)
+    assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-3)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-3)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-3)
+  }
+
+  test("binary logistic regression without intercept without regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(false)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights =
+     *     coef(glmnet(features,label, family="binomial", alpha = 0, lambda = 
0, intercept=FALSE))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                     s0
+     * (Intercept)   .
+     * data.V2     -0.3534996
+     * data.V3      1.2964482
+     * data.V4     -0.3571741
+     * data.V5     -0.7407946
+     */
+    val interceptR = 0.0
+    val weightsR = Array(-0.3534996, 1.2964482, -0.3571741, -0.7407946)
+
+    assert(model.intercept ~== interceptR relTol 1E-3)
+    assert(model.weights(0) ~== weightsR(0) relTol 1E-2)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-2)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-3)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-3)
+  }
+
+  test("binary logistic regression with intercept with L1 regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(true)
+      .setElasticNetParam(1.0).setRegParam(0.12)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 1, 
lambda = 0.12))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept) -0.05627428
+     * data.V2       .
+     * data.V3       .
+     * data.V4     -0.04325749
+     * data.V5     -0.02481551
+     */
+    val interceptR = -0.05627428
+    val weightsR = Array(0.0, 0.0, -0.04325749, -0.02481551)
+
+    assert(model.intercept ~== interceptR relTol 1E-2)
+    assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-3)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-2)
+    assert(model.weights(3) ~== weightsR(3) relTol 2E-2)
+  }
+
+  test("binary logistic regression without intercept with L1 regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(false)
+      .setElasticNetParam(1.0).setRegParam(0.12)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 1, 
lambda = 0.12,
+     *     intercept=FALSE))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept)   .
+     * data.V2       .
+     * data.V3       .
+     * data.V4     -0.05189203
+     * data.V5     -0.03891782
+     */
+    val interceptR = 0.0
+    val weightsR = Array(0.0, 0.0, -0.05189203, -0.03891782)
+
+    assert(model.intercept ~== interceptR relTol 1E-3)
+    assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-3)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-2)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-2)
+  }
+
+  test("binary logistic regression with intercept with L2 regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(true)
+      .setElasticNetParam(0.0).setRegParam(1.37)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 0, 
lambda = 1.37))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept)  0.15021751
+     * data.V2     -0.07251837
+     * data.V3      0.10724191
+     * data.V4     -0.04865309
+     * data.V5     -0.10062872
+     */
+    val interceptR = 0.15021751
+    val weightsR = Array(-0.07251837, 0.10724191, -0.04865309, -0.10062872)
+
+    assert(model.intercept ~== interceptR relTol 1E-3)
+    assert(model.weights(0) ~== weightsR(0) relTol 1E-3)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-3)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-3)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-3)
+  }
+
+  test("binary logistic regression without intercept with L2 regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(false)
+      .setElasticNetParam(0.0).setRegParam(1.37)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 0, 
lambda = 1.37,
+     *     intercept=FALSE))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept)   .
+     * data.V2     -0.06099165
+     * data.V3      0.12857058
+     * data.V4     -0.04708770
+     * data.V5     -0.09799775
+     */
+    val interceptR = 0.0
+    val weightsR = Array(-0.06099165, 0.12857058, -0.04708770, -0.09799775)
+
+    assert(model.intercept ~== interceptR relTol 1E-3)
+    assert(model.weights(0) ~== weightsR(0) relTol 1E-2)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-2)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-3)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-3)
+  }
+
+  test("binary logistic regression with intercept with ElasticNet 
regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(true)
+      .setElasticNetParam(0.38).setRegParam(0.21)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 
0.38, lambda = 0.21))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept)  0.57734851
+     * data.V2     -0.05310287
+     * data.V3       .
+     * data.V4     -0.08849250
+     * data.V5     -0.15458796
+     */
+    val interceptR = 0.57734851
+    val weightsR = Array(-0.05310287, 0.0, -0.08849250, -0.15458796)
+
+    assert(model.intercept ~== interceptR relTol 6E-3)
+    assert(model.weights(0) ~== weightsR(0) relTol 5E-3)
+    assert(model.weights(1) ~== weightsR(1) relTol 1E-3)
+    assert(model.weights(2) ~== weightsR(2) relTol 5E-3)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-3)
+  }
+
+  test("binary logistic regression without intercept with ElasticNet 
regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(false)
+      .setElasticNetParam(0.38).setRegParam(0.21)
+    val model = trainer.fit(binaryDataset)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 
0.38, lambda = 0.21,
+     *     intercept=FALSE))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept)   .
+     * data.V2     -0.001005743
+     * data.V3      0.072577857
+     * data.V4     -0.081203769
+     * data.V5     -0.142534158
+     */
+    val interceptR = 0.0
+    val weightsR = Array(-0.001005743, 0.072577857, -0.081203769, -0.142534158)
+
+    assert(model.intercept ~== interceptR relTol 1E-3)
+    assert(model.weights(0) ~== weightsR(0) absTol 1E-3)
+    assert(model.weights(1) ~== weightsR(1) absTol 1E-2)
+    assert(model.weights(2) ~== weightsR(2) relTol 1E-3)
+    assert(model.weights(3) ~== weightsR(3) relTol 1E-2)
+  }
+
+  test("binary logistic regression with intercept with strong L1 
regularization") {
+    val trainer = (new LogisticRegression).setFitIntercept(true)
+      .setElasticNetParam(1.0).setRegParam(6.0)
+    val model = trainer.fit(binaryDataset)
+
+    val histogram = binaryDataset.map { case Row(label: Double, features: 
Vector) => label }
+      .treeAggregate(new MultiClassSummarizer)(
+        seqOp = (c, v) => (c, v) match {
+          case (classSummarizer: MultiClassSummarizer, label: Double) => 
classSummarizer.add(label)
+        },
+        combOp = (c1, c2) => (c1, c2) match {
+          case (classSummarizer1: MultiClassSummarizer, classSummarizer2: 
MultiClassSummarizer) =>
+            classSummarizer1.merge(classSummarizer2)
+        }).histogram
+
+    /**
+     * For binary logistic regression with strong L1 regularization, all the 
weights will be zeros.
+     * As a result,
+     * {{{
+     * P(0) = 1 / (1 + \exp(b)), and
+     * P(1) = \exp(b) / (1 + \exp(b))
+     * }}}, hence
+     * {{{
+     * b = \log{P(1) / P(0)} = \log{count_1 / count_0}
+     * }}}
+     */
+    val interceptTheory = Math.log(histogram(1).toDouble / 
histogram(0).toDouble)
+    val weightsTheory = Array(0.0, 0.0, 0.0, 0.0)
+
+    assert(model.intercept ~== interceptTheory relTol 1E-5)
+    assert(model.weights(0) ~== weightsTheory(0) absTol 1E-6)
+    assert(model.weights(1) ~== weightsTheory(1) absTol 1E-6)
+    assert(model.weights(2) ~== weightsTheory(2) absTol 1E-6)
+    assert(model.weights(3) ~== weightsTheory(3) absTol 1E-6)
+
+    /**
+     * Using the following R code to load the data and train the model using 
glmnet package.
+     *
+     * > library("glmnet")
+     * > data <- read.csv("path", header=FALSE)
+     * > label = factor(data$V1)
+     * > features = as.matrix(data.frame(data$V2, data$V3, data$V4, data$V5))
+     * > weights = coef(glmnet(features,label, family="binomial", alpha = 1.0, 
lambda = 6.0))
+     * > weights
+     * 5 x 1 sparse Matrix of class "dgCMatrix"
+     *                      s0
+     * (Intercept) -0.2480643
+     * data.V2      0.0000000
+     * data.V3       .
+     * data.V4       .
+     * data.V5       .
+     */
+    val interceptR = -0.248065
+    val weightsR = Array(0.0, 0.0, 0.0, 0.0)
+
+    assert(model.intercept ~== interceptR relTol 1E-5)
+    assert(model.weights(0) ~== weightsR(0) absTol 1E-6)
+    assert(model.weights(1) ~== weightsR(1) absTol 1E-6)
+    assert(model.weights(2) ~== weightsR(2) absTol 1E-6)
+    assert(model.weights(3) ~== weightsR(3) absTol 1E-6)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/86ef4cfd/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index a26c528..fb0a194 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -91,21 +91,21 @@ object LogisticRegressionSuite {
       seed: Int): Seq[LabeledPoint] = {
     val rnd = new Random(seed)
 
-    val xDim = xMean.size
+    val xDim = xMean.length
     val xWithInterceptsDim = if (addIntercept) xDim + 1 else xDim
-    val nClasses = weights.size / xWithInterceptsDim + 1
+    val nClasses = weights.length / xWithInterceptsDim + 1
 
     val x = 
Array.fill[Vector](nPoints)(Vectors.dense(Array.fill[Double](xDim)(rnd.nextGaussian())))
 
-    x.map(vector => {
+    x.foreach { vector =>
       // This doesn't work if `vector` is a sparse vector.
       val vectorArray = vector.toArray
       var i = 0
-      while (i < vectorArray.size) {
+      while (i < vectorArray.length) {
         vectorArray(i) = vectorArray(i) * math.sqrt(xVariance(i)) + xMean(i)
         i += 1
       }
-    })
+    }
 
     val y = (0 until nPoints).map { idx =>
       val xArray = x(idx).toArray


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to