Repository: spark Updated Branches: refs/heads/master beed5e20a -> d52f63622
[SPARK-20790][MLLIB] Correctly handle negative values for implicit feedback in ALS ## What changes were proposed in this pull request? Revert the handling of negative values in ALS with implicit feedback, so that the confidence is the absolute value of the rating and the preference is 0 for negative ratings. This was the original behavior. ## How was this patch tested? This patch was tested with the existing unit tests and an added unit test to ensure that negative ratings are not ignored. mengxr Author: David Eis <[email protected]> Closes #18022 from davideis/bugfix/negative-rating. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d52f6362 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d52f6362 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d52f6362 Branch: refs/heads/master Commit: d52f636228e833db89045bc7a0c17b72da13f138 Parents: beed5e2 Author: David Eis <[email protected]> Authored: Wed May 31 13:52:55 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Wed May 31 13:52:55 2017 +0100 ---------------------------------------------------------------------- .../apache/spark/ml/recommendation/ALS.scala | 22 +++++---- .../spark/ml/recommendation/ALSSuite.scala | 50 +++++++++++++++++++- 2 files changed, 62 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d52f6362/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0955d3e..3d5fd17 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -763,11 +763,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** * Representing a normal equation to solve the following weighted least squares problem: * - * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x. + * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - d,,i,,)^2^ + lambda * x^T^ x. * * Its normal equation is given by * - * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0. + * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0. + * + * Distributing and letting b,,i,, = c,,i,, * d,,i,, + * + * \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0. */ private[recommendation] class NormalEquation(val k: Int) extends Serializable { @@ -796,7 +800,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { copyToDouble(a) blas.dspr(upper, k, c, da, 1, ata) if (b != 0.0) { - blas.daxpy(k, c * b, da, 1, atb, 1) + blas.daxpy(k, b, da, 1, atb, 1) } this } @@ -1624,15 +1628,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val srcFactor = sortedSrcFactors(blockId)(localIndex) val rating = ratings(i) if (implicitPrefs) { - // Extension to the original paper to handle b < 0. confidence is a function of |b| - // instead so that it is never negative. c1 is confidence - 1.0. + // Extension to the original paper to handle rating < 0. confidence is a function + // of |rating| instead so that it is never negative. c1 is confidence - 1. val c1 = alpha * math.abs(rating) - // For rating <= 0, the corresponding preference is 0. So the term below is only added - // for rating > 0. Because YtY is already added, we need to adjust the scaling here. - if (rating > 0) { + // For rating <= 0, the corresponding preference is 0. So the second argument of add + // is only there for rating > 0. + if (rating > 0.0) { numExplicits += 1 - ls.add(srcFactor, (c1 + 1.0) / c1, c1) } + ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1) } else { ls.add(srcFactor, rating) numExplicits += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/d52f6362/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9d31e79..701040f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.ml.recommendation.ALS._ import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.recommendation.MatrixFactorizationModelSuite import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} @@ -78,7 +79,7 @@ class ALSSuite val k = 2 val ne0 = new NormalEquation(k) .add(Array(1.0f, 2.0f), 3.0) - .add(Array(4.0f, 5.0f), 6.0, 2.0) // weighted + .add(Array(4.0f, 5.0f), 12.0, 2.0) // weighted assert(ne0.k === k) assert(ne0.triK === k * (k + 1) / 2) // NumPy code that computes the expected values: @@ -348,6 +349,37 @@ class ALSSuite } /** + * Train ALS using the given training set and parameters + * @param training training dataset + * @param rank rank of the matrix factorization + * @param maxIter max number of iterations + * @param regParam regularization constant + * @param implicitPrefs whether to use implicit preference + * @param numUserBlocks number of user blocks + * @param numItemBlocks number of item blocks + * @return a trained ALSModel + */ + def trainALS( + training: RDD[Rating[Int]], + rank: Int, + maxIter: Int, + regParam: Double, + implicitPrefs: Boolean = false, + numUserBlocks: Int = 2, + numItemBlocks: Int = 3): ALSModel = { + val spark = this.spark + import spark.implicits._ + val als = new ALS() + .setRank(rank) + .setRegParam(regParam) + .setImplicitPrefs(implicitPrefs) + .setNumUserBlocks(numUserBlocks) + .setNumItemBlocks(numItemBlocks) + .setSeed(0) + als.fit(training.toDF()) + } + + /** * Test ALS using the given training/test splits and parameters. * @param training training dataset * @param test test dataset @@ -455,6 +487,22 @@ class ALSSuite targetRMSE = 0.3) } + test("implicit feedback regression") { + val trainingWithNeg = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, -3))) + val trainingWithZero = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, 0))) + val modelWithNeg = + trainALS(trainingWithNeg, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true) + val modelWithZero = + trainALS(trainingWithZero, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true) + val userFactorsNeg = modelWithNeg.userFactors + val itemFactorsNeg = modelWithNeg.itemFactors + val userFactorsZero = modelWithZero.userFactors + val itemFactorsZero = modelWithZero.itemFactors + userFactorsNeg.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" "))) + userFactorsZero.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" "))) + assert(userFactorsNeg.intersect(userFactorsZero).count() == 0) + assert(itemFactorsNeg.intersect(itemFactorsZero).count() == 0) + } test("using generic ID types") { val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
