Repository: spark Updated Branches: refs/heads/branch-2.2 3cad66e5e -> 3686c2e96
[SPARK-20790][MLLIB] Correctly handle negative values for implicit feedback in ALS ## What changes were proposed in this pull request? Revert the handling of negative values in ALS with implicit feedback, so that the confidence is the absolute value of the rating and the preference is 0 for negative ratings. This was the original behavior. ## How was this patch tested? This patch was tested with the existing unit tests and an added unit test to ensure that negative ratings are not ignored. mengxr Author: David Eis <[email protected]> Closes #18022 from davideis/bugfix/negative-rating. (cherry picked from commit d52f636228e833db89045bc7a0c17b72da13f138) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3686c2e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3686c2e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3686c2e9 Branch: refs/heads/branch-2.2 Commit: 3686c2e965758f471f9784b3e06223ce143b6aca Parents: 3cad66e Author: David Eis <[email protected]> Authored: Wed May 31 13:52:55 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Wed May 31 13:53:05 2017 +0100 ---------------------------------------------------------------------- .../apache/spark/ml/recommendation/ALS.scala | 22 +++++---- .../spark/ml/recommendation/ALSSuite.scala | 50 +++++++++++++++++++- 2 files changed, 62 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3686c2e9/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index b2e3dba..3aa6457 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -763,11 +763,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** * Representing a normal equation to solve the following weighted least squares problem: * - * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x. + * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - d,,i,,)^2^ + lambda * x^T^ x. * * Its normal equation is given by * - * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0. + * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0. + * + * Distributing and letting b,,i,, = c,,i,, * d,,i,, + * + * \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0. */ private[recommendation] class NormalEquation(val k: Int) extends Serializable { @@ -796,7 +800,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { copyToDouble(a) blas.dspr(upper, k, c, da, 1, ata) if (b != 0.0) { - blas.daxpy(k, c * b, da, 1, atb, 1) + blas.daxpy(k, b, da, 1, atb, 1) } this } @@ -1456,15 +1460,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val srcFactor = sortedSrcFactors(blockId)(localIndex) val rating = ratings(i) if (implicitPrefs) { - // Extension to the original paper to handle b < 0. confidence is a function of |b| - // instead so that it is never negative. c1 is confidence - 1.0. + // Extension to the original paper to handle rating < 0. confidence is a function + // of |rating| instead so that it is never negative. c1 is confidence - 1. val c1 = alpha * math.abs(rating) - // For rating <= 0, the corresponding preference is 0. So the term below is only added - // for rating > 0. Because YtY is already added, we need to adjust the scaling here. - if (rating > 0) { + // For rating <= 0, the corresponding preference is 0. So the second argument of add + // is only there for rating > 0. + if (rating > 0.0) { numExplicits += 1 - ls.add(srcFactor, (c1 + 1.0) / c1, c1) } + ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1) } else { ls.add(srcFactor, rating) numExplicits += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/3686c2e9/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9d31e79..701040f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.ml.recommendation.ALS._ import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.recommendation.MatrixFactorizationModelSuite import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} @@ -78,7 +79,7 @@ class ALSSuite val k = 2 val ne0 = new NormalEquation(k) .add(Array(1.0f, 2.0f), 3.0) - .add(Array(4.0f, 5.0f), 6.0, 2.0) // weighted + .add(Array(4.0f, 5.0f), 12.0, 2.0) // weighted assert(ne0.k === k) assert(ne0.triK === k * (k + 1) / 2) // NumPy code that computes the expected values: @@ -348,6 +349,37 @@ class ALSSuite } /** + * Train ALS using the given training set and parameters + * @param training training dataset + * @param rank rank of the matrix factorization + * @param maxIter max number of iterations + * @param regParam regularization constant + * @param implicitPrefs whether to use implicit preference + * @param numUserBlocks number of user blocks + * @param numItemBlocks number of item blocks + * @return a trained ALSModel + */ + def trainALS( + training: RDD[Rating[Int]], + rank: Int, + maxIter: Int, + regParam: Double, + implicitPrefs: Boolean = false, + numUserBlocks: Int = 2, + numItemBlocks: Int = 3): ALSModel = { + val spark = this.spark + import spark.implicits._ + val als = new ALS() + .setRank(rank) + .setRegParam(regParam) + .setImplicitPrefs(implicitPrefs) + .setNumUserBlocks(numUserBlocks) + .setNumItemBlocks(numItemBlocks) + .setSeed(0) + als.fit(training.toDF()) + } + + /** * Test ALS using the given training/test splits and parameters. * @param training training dataset * @param test test dataset @@ -455,6 +487,22 @@ class ALSSuite targetRMSE = 0.3) } + test("implicit feedback regression") { + val trainingWithNeg = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, -3))) + val trainingWithZero = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, 0))) + val modelWithNeg = + trainALS(trainingWithNeg, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true) + val modelWithZero = + trainALS(trainingWithZero, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true) + val userFactorsNeg = modelWithNeg.userFactors + val itemFactorsNeg = modelWithNeg.itemFactors + val userFactorsZero = modelWithZero.userFactors + val itemFactorsZero = modelWithZero.itemFactors + userFactorsNeg.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" "))) + userFactorsZero.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" "))) + assert(userFactorsNeg.intersect(userFactorsZero).count() == 0) + assert(itemFactorsNeg.intersect(itemFactorsZero).count() == 0) + } test("using generic ID types") { val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
