This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ebdf41d [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors ebdf41d is described below commit ebdf41dd698ce138d07f63b1fa3ffbcc392e7fff Author: zhengruifeng <ruife...@foxmail.com> AuthorDate: Wed May 6 10:06:23 2020 +0800 [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors ### What changes were proposed in this pull request? 1, add new param `blockSize`; 2, add a new class InstanceBlock; 3, **if `blockSize==1`, keep original behavior; if `blockSize>1`, stack input vectors to blocks (like ALS/MLP);** 4, if `blockSize>1`, standardize the input outside of optimization procedure; ### Why are the changes needed? 1, reduce RAM to persist traing dataset; (save about 40% RAM) 2, use Level-2 BLAS routines; (4x ~ 5x faster on dataset `epsilon`) ### Does this PR introduce any user-facing change? Yes, a new param is added ### How was this patch tested? existing and added testsuites Closes #28349 from zhengruifeng/blockify_svc_II. Authored-by: zhengruifeng <ruife...@foxmail.com> Signed-off-by: zhengruifeng <ruife...@foxmail.com> --- .../apache/spark/serializer/KryoSerializer.scala | 1 + .../org/apache/spark/ml/linalg/Matrices.scala | 41 ++++ .../apache/spark/ml/classification/LinearSVC.scala | 222 ++++++++++++++------- .../org/apache/spark/ml/feature/Instance.scala | 91 ++++++++- .../ml/optim/aggregator/HingeAggregator.scala | 110 +++++++++- .../org/apache/spark/ml/stat/Summarizer.scala | 5 +- .../spark/ml/classification/LinearSVCSuite.scala | 15 ++ .../apache/spark/ml/feature/InstanceSuite.scala | 31 +++ .../ml/optim/aggregator/HingeAggregatorSuite.scala | 50 ++++- python/pyspark/ml/classification.py | 23 ++- 10 files changed, 498 insertions(+), 91 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index cdaab59..55ac2c4 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer { "org.apache.spark.ml.attribute.NumericAttribute", "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.InstanceBlock", "org.apache.spark.ml.feature.LabeledPoint", "org.apache.spark.ml.feature.OffsetInstance", "org.apache.spark.ml.linalg.DenseMatrix", diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala index 34e4366..1254ed7 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala @@ -1008,6 +1008,47 @@ object SparseMatrix { @Since("2.0.0") object Matrices { + private[ml] def fromVectors(vectors: Seq[Vector]): Matrix = { + val numRows = vectors.length + val numCols = vectors.head.size + val denseSize = Matrices.getDenseSize(numCols, numRows) + val nnz = vectors.iterator.map(_.numNonzeros).sum + val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) + if (denseSize < sparseSize) { + val values = Array.ofDim[Double](numRows * numCols) + var offset = 0 + var j = 0 + while (j < numRows) { + vectors(j).foreachNonZero { (i, v) => + values(offset + i) = v + } + offset += numCols + j += 1 + } + new DenseMatrix(numRows, numCols, values, true) + } else { + val colIndices = MArrayBuilder.make[Int] + val values = MArrayBuilder.make[Double] + val rowPtrs = MArrayBuilder.make[Int] + var rowPtr = 0 + rowPtrs += 0 + var j = 0 + while (j < numRows) { + var nnz = 0 + vectors(j).foreachNonZero { (i, v) => + colIndices += i + values += v + nnz += 1 + } + rowPtr += nnz + rowPtrs += rowPtr + j += 1 + } + new SparseMatrix(numRows, numCols, rowPtrs.result(), + colIndices.result(), values.result(), true) + } + } + /** * Creates a column-major dense matrix. * diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index efe84f8..69c35a8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,21 +26,23 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.optim.aggregator.HingeAggregator +import org.apache.spark.ml.optim.aggregator._ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold { + with HasAggregationDepth with HasThreshold with HasBlockSize { /** * Param for threshold in binary classification prediction. @@ -154,31 +156,65 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.1.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr => - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - - val instances = extractInstances(dataset) - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) + + val instances = extractInstances(dataset) + .setName("training instances") - val (summarizer, labelSummarizer) = + val (summarizer, labelSummarizer) = if ($(blockSize) == 1) { + if (dataset.storageLevel == StorageLevel.NONE) { + instances.persist(StorageLevel.MEMORY_AND_DISK) + } Summarizer.getClassificationSummarizers(instances, $(aggregationDepth)) - instr.logNumExamples(summarizer.count) - instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) - instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) - instr.logSumOfWeights(summarizer.weightSum) + } else { + // instances will be standardized and converted to blocks, so no need to cache instances. + Summarizer.getClassificationSummarizers(instances, $(aggregationDepth), + Seq("mean", "std", "count", "numNonZeros")) + } val histogram = labelSummarizer.histogram val numInvalid = labelSummarizer.countInvalid val numFeatures = summarizer.mean.size - val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures + + instr.logNumExamples(summarizer.count) + instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) + instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) + instr.logSumOfWeights(summarizer.weightSum) + if ($(blockSize) > 1) { + val scale = 1.0 / summarizer.count / numFeatures + val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum + instr.logNamedValue("sparsity", sparsity.toString) + if (sparsity > 0.5) { + instr.logWarning(s"sparsity of input dataset is $sparsity, " + + s"which may hurt performance in high-level BLAS.") + } + } val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { case Some(n: Int) => @@ -192,77 +228,113 @@ class LinearSVC @Since("2.2.0") ( instr.logNumClasses(numClasses) instr.logNumFeatures(numFeatures) - val (coefficientVector, interceptVector, objectiveHistory) = { - if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." - instr.logError(msg) - throw new SparkException(msg) - } + if (numInvalid != 0) { + val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + + s"Found $numInvalid invalid labels." + instr.logError(msg) + throw new SparkException(msg) + } - val featuresStd = summarizer.std.toArray - val getFeaturesStd = (j: Int) => featuresStd(j) - val regParamL2 = $(regParam) - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val regularization = if (regParamL2 != 0.0) { - val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures - Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(getFeaturesStd))) - } else { - None - } + val featuresStd = summarizer.std.toArray + val getFeaturesStd = (j: Int) => featuresStd(j) + val regularization = if ($(regParam) != 0.0) { + val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures + Some(new L2Regularization($(regParam), shouldApply, + if ($(standardization)) None else Some(getFeaturesStd))) + } else None + + def regParamL1Fun = (index: Int) => 0.0 + val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + + /* + The coefficients are trained in the scaled space; we're converting them back to + the original space. + Note that the intercept in scaled space and original space is the same; + as a result, no scaling is needed. + */ + val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) { + trainOnRows(instances, featuresStd, regularization, optimizer) + } else { + trainOnBlocks(instances, featuresStd, regularization, optimizer) + } + if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, - $(aggregationDepth)) + if (rawCoefficients == null) { + val msg = s"${optimizer.getClass.getName} failed." + instr.logError(msg) + throw new SparkException(msg) + } - def regParamL1Fun = (index: Int) => 0D - val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) - val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept) + val coefficientArray = Array.tabulate(numFeatures) { i => + if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0 + } + val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0 + copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept)) + } - val states = optimizer.iterations(new CachedDiffFunction(costFun), - initialCoefWithIntercept.asBreeze.toDenseVector) + private def trainOnRows( + instances: RDD[Instance], + featuresStd: Array[Double], + regularization: Option[L2Regularization], + optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { + val numFeatures = featuresStd.length + val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures + + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + val costFun = new RDDLossFunction(instances, getAggregatorFunc, + regularization, $(aggregationDepth)) + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) + + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + bcFeaturesStd.destroy() - val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - scaledObjectiveHistory += state.adjustedValue - } + (if (state != null) state.x.toArray else null, arrayBuilder.result) + } - bcFeaturesStd.destroy() - if (state == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) - } + private def trainOnBlocks( + instances: RDD[Instance], + featuresStd: Array[Double], + regularization: Option[L2Regularization], + optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { + val numFeatures = featuresStd.length + val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures - /* - The coefficients are trained in the scaled space; we're converting them back to - the original space. - Note that the intercept in scaled space and original space is the same; - as a result, no scaling is needed. - */ - val rawCoefficients = state.x.toArray - val coefficientArray = Array.tabulate(numFeatures) { i => - if (featuresStd(i) != 0.0) { - rawCoefficients(i) / featuresStd(i) - } else { - 0.0 - } - } + val bcFeaturesStd = instances.context.broadcast(featuresStd) - val intercept = if ($(fitIntercept)) { - rawCoefficients(numFeaturesPlusIntercept - 1) - } else { - 0.0 - } - (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) + val standardized = instances.mapPartitions { iter => + val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 } + val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) + iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_) + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, + regularization, $(aggregationDepth)) + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) + + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + blocks.unpersist() + bcFeaturesStd.destroy() - if (handlePersistence) instances.unpersist() - - copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) + (if (state != null) state.x.toArray else null, arrayBuilder.result) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 11d0c46..db5f88d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,7 +17,8 @@ package org.apache.spark.ml.feature -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg._ +import org.apache.spark.rdd.RDD /** * Class that represents an instance of weighted data point with label and features. @@ -28,6 +29,94 @@ import org.apache.spark.ml.linalg.Vector */ private[spark] case class Instance(label: Double, weight: Double, features: Vector) + +/** + * Class that represents an block of instance. + * If all weights are 1, then an empty array is stored. + */ +private[spark] case class InstanceBlock( + labels: Array[Double], + weights: Array[Double], + matrix: Matrix) { + require(labels.length == matrix.numRows) + require(matrix.isTransposed) + if (weights.nonEmpty) { + require(labels.length == weights.length) + } + + def size: Int = labels.length + + def numFeatures: Int = matrix.numCols + + def instanceIterator: Iterator[Instance] = { + if (weights.nonEmpty) { + labels.iterator.zip(weights.iterator).zip(matrix.rowIter) + .map { case ((label, weight), vec) => Instance(label, weight, vec) } + } else { + labels.iterator.zip(matrix.rowIter) + .map { case (label, vec) => Instance(label, 1.0, vec) } + } + } + + def getLabel(i: Int): Double = labels(i) + + def labelIter: Iterator[Double] = labels.iterator + + @transient lazy val getWeight: Int => Double = { + if (weights.nonEmpty) { + (i: Int) => weights(i) + } else { + (i: Int) => 1.0 + } + } + + def weightIter: Iterator[Double] = { + if (weights.nonEmpty) { + weights.iterator + } else { + Iterator.fill(size)(1.0) + } + } + + // directly get the non-zero iterator of i-th row vector without array copy or slice + @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = { + matrix match { + case dm: DenseMatrix => + (i: Int) => + val start = numFeatures * i + Iterator.tabulate(numFeatures)(j => + (j, dm.values(start + j)) + ).filter(_._2 != 0) + case sm: SparseMatrix => + (i: Int) => + val start = sm.colPtrs(i) + val end = sm.colPtrs(i + 1) + Iterator.tabulate(end - start)(j => + (sm.rowIndices(start + j), sm.values(start + j)) + ).filter(_._2 != 0) + } + } +} + +private[spark] object InstanceBlock { + + def fromInstances(instances: Seq[Instance]): InstanceBlock = { + val labels = instances.map(_.label).toArray + val weights = if (instances.exists(_.weight != 1)) { + instances.map(_.weight).toArray + } else { + Array.emptyDoubleArray + } + val matrix = Matrices.fromVectors(instances.map(_.features)) + new InstanceBlock(labels, weights, matrix) + } + + def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { + instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) + } +} + + /** * Case class that represents an instance of data point with * label, weight, offset and features. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b0906f1..1525bb9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ /** @@ -39,8 +39,8 @@ private[ml] class HingeAggregator( fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[Instance, HingeAggregator] { - private val numFeatures: Int = bcFeaturesStd.value.length - private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + private val numFeatures = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + @@ -103,3 +103,107 @@ private[ml] class HingeAggregator( } } } + + +/** + * BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in + * binary classification for blocks in sparse or dense matrix in an online fashion. + * + * Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * NOTE: The feature values are expected to be standardized before computation. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + */ +private[ml] class BlockHingeAggregator( + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] { + + protected override val dim: Int = bcCoefficients.value.size + private val numFeatures = if (fitIntercept) dim - 1 else dim + + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } + + @transient private lazy val linear = { + val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray + Vectors.dense(linear).toDense + } + + /** + * Add a new training instance block to this HingeAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The InstanceBlock to be added. + * @return This HingeAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(block.matrix.isTransposed) + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size + + // vec here represents dotProducts + val vec = if (fitIntercept) { + Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense + } else { + Vectors.zeros(size).toDense + } + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + + // in-place convert dotProducts to gradient scales + // then, vec represents gradient scales + var i = 0 + var interceptGradSum = 0.0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val label = block.getLabel(i) + val labelScaled = label + label - 1.0 + val loss = (1.0 - labelScaled * vec.values(i)) * weight + if (loss > 0) { + lossSum += loss + val gradScale = -labelScaled * weight + vec.values(i) = gradScale + if (fitIntercept) interceptGradSum += gradScale + } else { vec.values(i) = 0.0 } + } else { vec.values(i) = 0.0 } + i += 1 + } + + // predictions are all correct, no gradient signal + if (vec.values.forall(_ == 0)) return this + + block.matrix match { + case dm: DenseMatrix => + BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, + vec.values, 1, 1.0, gradientSumArray, 1) + if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum + + case sm: SparseMatrix if fitIntercept => + val linearGradSumVec = Vectors.zeros(numFeatures).toDense + BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) + BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, + gradientSumArray, 1) + gradientSumArray(numFeatures) += interceptGradSum + + case sm: SparseMatrix if !fitIntercept => + val gradSumVec = new DenseVector(gradientSumArray) + BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) + } + + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala index 1183041..4230b49 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala @@ -225,9 +225,10 @@ object Summarizer extends Logging { /** Get classification feature and label summarizers for provided data. */ private[ml] def getClassificationSummarizers( instances: RDD[Instance], - aggregationDepth: Int = 2): (SummarizerBuffer, MultiClassSummarizer) = { + aggregationDepth: Int = 2, + requested: Seq[String] = Seq("mean", "std", "count")) = { instances.treeAggregate( - (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( + (Summarizer.createSummarizerBuffer(requested: _*), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)), combOp = (c1: (SummarizerBuffer, MultiClassSummarizer), diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index c2072ce..579d6b1 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -207,6 +207,21 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { dataset.as[LabeledPoint], estimator, modelEquals, 42L) } + test("LinearSVC on blocks") { + for (dataset <- Seq(smallBinaryDataset, smallSparseBinaryDataset); + fitIntercept <- Seq(true, false)) { + val lsvc = new LinearSVC() + .setFitIntercept(fitIntercept) + .setMaxIter(5) + val model = lsvc.fit(dataset) + Seq(4, 16, 64).foreach { blockSize => + val model2 = lsvc.setBlockSize(blockSize).fit(dataset) + assert(model.intercept ~== model2.intercept relTol 1e-9) + assert(model.coefficients ~== model2.coefficients relTol 1e-9) + } + } + } + test("prediction on single instance") { val trainer = new LinearSVC() val model = trainer.fit(smallBinaryDataset) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index 5a74490..d780bdf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -42,5 +42,36 @@ class InstanceSuite extends SparkFunSuite{ val o2 = ser.deserialize[OffsetInstance](ser.serialize(o)) assert(o === o2) } + + val block1 = InstanceBlock.fromInstances(Seq(instance1)) + val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2)) + Seq(block1, block2).foreach { o => + val o2 = ser.deserialize[InstanceBlock](ser.serialize(o)) + assert(o.labels === o2.labels) + assert(o.weights === o2.weights) + assert(o.matrix === o2.matrix) + } + } + + test("InstanceBlock: check correctness") { + val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) + val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) + val instances = Seq(instance1, instance2) + + val block = InstanceBlock.fromInstances(instances) + assert(block.size === 2) + assert(block.numFeatures === 2) + block.instanceIterator.zipWithIndex.foreach { + case (instance, i) => + assert(instance.label === instances(i).label) + assert(instance.weight === instances(i).weight) + assert(instance.features.toArray === instances(i).features.toArray) + } + Seq(0, 1).foreach { i => + val nzIter = block.getNonZeroIter(i) + val vec = Vectors.sparse(2, nzIter.toSeq) + assert(vec.toArray === instances(i).features.toArray) + } } + } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 16d27a9..51a1edd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.stat.Summarizer import org.apache.spark.ml.util.TestingUtils._ @@ -28,6 +28,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { @transient var instances: Array[Instance] = _ @transient var instancesConstantFeature: Array[Instance] = _ @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + @transient var standardizedInstances: Array[Instance] = _ override def beforeAll(): Unit = { super.beforeAll() @@ -46,6 +47,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) ) + standardizedInstances = standardize(instances) } /** Get summary statistics for some data and create a new HingeAggregator. */ @@ -61,6 +63,29 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) } + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val stdArray = featuresSummarizer.std.toArray + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } + } + + /** Get summary statistics for some data and create a new BlockHingeAggregator. */ + private def getNewBlockAggregator( + coefficients: Vector, + fitIntercept: Boolean): BlockHingeAggregator = { + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new BlockHingeAggregator(fitIntercept)(bcCoefficients) + } + test("aggregator add method input size") { val coefArray = Array(1.0, 2.0) val interceptArray = Array(2.0) @@ -139,8 +164,26 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { } val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) - assert(loss ~== agg.loss relTol 0.01) - assert(gradient ~== agg.gradient relTol 0.01) + assert(loss ~== agg.loss relTol 1e-9) + assert(gradient ~== agg.gradient relTol 1e-9) + + Seq(1, 2, 4).foreach { blockSize => + val blocks1 = standardizedInstances + .grouped(blockSize) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + Seq(blocks1, blocks2).foreach { blocks => + val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + blocks.foreach(blockAgg.add) + assert(loss ~== blockAgg.loss relTol 1e-9) + assert(gradient ~== blockAgg.gradient relTol 1e-9) + } + } } test("check with zero standard deviation") { @@ -158,5 +201,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(0) === 0.0) assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } - } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0d88aa8..318ae7a 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -241,7 +241,8 @@ class _JavaProbabilisticClassificationModel(ProbabilisticClassificationModel, class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, - HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold): + HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, + HasBlockSize): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -290,6 +291,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl LinearSVCModel... >>> model.getThreshold() 0.5 + >>> model.getBlockSize() + 1 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -328,18 +331,19 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=1): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, - standardization=True, threshold=0.0, aggregationDepth=2) + standardization=True, threshold=0.0, aggregationDepth=2, + blockSize=1) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -348,12 +352,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=1): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=1): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -418,6 +422,13 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl """ return self._set(aggregationDepth=value) + @since("3.1.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org