Repository: spark
Updated Branches:
  refs/heads/master 34d6c4cd1 -> 1f0a46958


[SPARK-16008][ML] Remove unnecessary serialization in logistic regression

JIRA: [SPARK-16008](https://issues.apache.org/jira/browse/SPARK-16008)

## What changes were proposed in this pull request?
`LogisticAggregator` stores references to two arrays of dimension `numFeatures` 
which are serialized before the combine op, unnecessarily. This results in the 
shuffle write being ~3x (for multiclass logistic regression, this number will 
go up) larger than it should be (in MLlib, for instance, it is 3x smaller).

This patch modifies `LogisticAggregator.add` to accept the two arrays as method 
parameters which avoids the serialization.

## How was this patch tested?

I tested this locally and verified the serialization reduction.

![image](https://cloud.githubusercontent.com/assets/7275795/16140387/d2974bac-3404-11e6-94f9-268860c931a2.png)

Additionally, I ran some tests of a 4 node cluster (4x48 cores, 4x128 GB RAM). 
Data set size of 2M rows and 10k features showed >2x iteration speedup.

Author: sethah <seth.hendrickso...@gmail.com>

Closes #13729 from sethah/lr_improvement.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f0a4695
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f0a4695
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f0a4695

Branch: refs/heads/master
Commit: 1f0a46958ef51a01560ada23665dccde89696e12
Parents: 34d6c4c
Author: sethah <seth.hendrickso...@gmail.com>
Authored: Fri Jun 17 09:58:49 2016 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri Jun 17 09:58:49 2016 -0700

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 57 ++++++++++----------
 1 file changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f0a4695/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 51ede15..9469acf 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -937,50 +937,47 @@ class BinaryLogisticRegressionSummary 
private[classification] (
  * Two LogisticAggregator can be merged together to have a summary of loss and 
gradient of
  * the corresponding joint dataset.
  *
- * @param coefficients The coefficients corresponding to the features.
  * @param numClasses the number of possible outcomes for k classes 
classification problem in
  *                   Multinomial Logistic Regression.
  * @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
  */
 private class LogisticAggregator(
-    coefficients: Vector,
+    private val numFeatures: Int,
     numClasses: Int,
-    fitIntercept: Boolean,
-    featuresStd: Array[Double],
-    featuresMean: Array[Double]) extends Serializable {
+    fitIntercept: Boolean) extends Serializable {
 
   private var weightSum = 0.0
   private var lossSum = 0.0
 
-  private val coefficientsArray = coefficients match {
-    case dv: DenseVector => dv.values
-    case _ =>
-      throw new IllegalArgumentException(
-        s"coefficients only supports dense vector but got type 
${coefficients.getClass}.")
-  }
-
-  private val dim = if (fitIntercept) coefficientsArray.length - 1 else 
coefficientsArray.length
-
-  private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)
+  private val gradientSumArray =
+    Array.ofDim[Double](if (fitIntercept) numFeatures + 1 else numFeatures)
 
   /**
    * Add a new training instance to this LogisticAggregator, and update the 
loss and gradient
    * of the objective function.
    *
    * @param instance The instance of data point to be added.
+   * @param coefficients The coefficients corresponding to the features.
+   * @param featuresStd The standard deviation values of the features.
    * @return This LogisticAggregator object.
    */
-  def add(instance: Instance): this.type = {
+  def add(
+      instance: Instance,
+      coefficients: Vector,
+      featuresStd: Array[Double]): this.type = {
     instance match { case Instance(label, weight, features) =>
-      require(dim == features.size, s"Dimensions mismatch when adding new 
instance." +
-        s" Expecting $dim but got ${features.size}.")
+      require(numFeatures == features.size, s"Dimensions mismatch when adding 
new instance." +
+        s" Expecting $numFeatures but got ${features.size}.")
       require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
 
       if (weight == 0.0) return this
 
-      val localCoefficientsArray = coefficientsArray
+      val coefficientsArray = coefficients match {
+        case dv: DenseVector => dv.values
+        case _ =>
+          throw new IllegalArgumentException(
+            s"coefficients only supports dense vector but got type 
${coefficients.getClass}.")
+      }
       val localGradientSumArray = gradientSumArray
 
       numClasses match {
@@ -990,11 +987,11 @@ private class LogisticAggregator(
             var sum = 0.0
             features.foreachActive { (index, value) =>
               if (featuresStd(index) != 0.0 && value != 0.0) {
-                sum += localCoefficientsArray(index) * (value / 
featuresStd(index))
+                sum += coefficientsArray(index) * (value / featuresStd(index))
               }
             }
             sum + {
-              if (fitIntercept) localCoefficientsArray(dim) else 0.0
+              if (fitIntercept) coefficientsArray(numFeatures) else 0.0
             }
           }
 
@@ -1007,7 +1004,7 @@ private class LogisticAggregator(
           }
 
           if (fitIntercept) {
-            localGradientSumArray(dim) += multiplier
+            localGradientSumArray(numFeatures) += multiplier
           }
 
           if (label > 0) {
@@ -1034,8 +1031,8 @@ private class LogisticAggregator(
    * @return This LogisticAggregator object.
    */
   def merge(other: LogisticAggregator): this.type = {
-    require(dim == other.dim, s"Dimensions mismatch when merging with another 
" +
-      s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
+    require(numFeatures == other.numFeatures, s"Dimensions mismatch when 
merging with another " +
+      s"LeastSquaresAggregator. Expecting $numFeatures but got 
${other.numFeatures}.")
 
     if (other.weightSum != 0.0) {
       weightSum += other.weightSum
@@ -1086,13 +1083,17 @@ private class LogisticCostFun(
   override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
     val numFeatures = featuresStd.length
     val coeffs = Vectors.fromBreeze(coefficients)
+    val n = coeffs.size
+    val localFeaturesStd = featuresStd
+
 
     val logisticAggregator = {
-      val seqOp = (c: LogisticAggregator, instance: Instance) => 
c.add(instance)
+      val seqOp = (c: LogisticAggregator, instance: Instance) =>
+        c.add(instance, coeffs, localFeaturesStd)
       val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => 
c1.merge(c2)
 
       instances.treeAggregate(
-        new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, 
featuresMean)
+        new LogisticAggregator(numFeatures, numClasses, fitIntercept)
       )(seqOp, combOp)
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to