Repository: mahout Updated Branches: refs/heads/mahout-1541 [created] 8a4b4347d
added Sebastian's CooccurrenceAnalysis patch updated it to use current Mahout-DSL Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/107a0ba9 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/107a0ba9 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/107a0ba9 Branch: refs/heads/mahout-1541 Commit: 107a0ba9605241653a85b113661a8fa5c055529f Parents: 127bd01 Author: pferrel <[email protected]> Authored: Wed Jun 4 12:54:22 2014 -0700 Committer: pferrel <[email protected]> Committed: Wed Jun 4 12:54:22 2014 -0700 ---------------------------------------------------------------------- .gitignore | 1 + spark/src/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/scala/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/scala/org/.DS_Store | Bin 0 -> 6148 bytes spark/src/main/scala/org/apache/.DS_Store | Bin 0 -> 6148 bytes .../apache/mahout/cf/CooccurrenceAnalysis.scala | 210 +++++++++++++++++++ .../mahout/cf/examples/Recommendations.scala | 169 +++++++++++++++ 8 files changed, 380 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index c47bff1..f500375 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ output-asf-email-examples/ .project .settings/ .idea/ +.DS_Store *.iml target/ examples/bin/tmp http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/.DS_Store ---------------------------------------------------------------------- diff --git a/spark/src/.DS_Store b/spark/src/.DS_Store new file mode 100644 index 0000000..7b0d367 Binary files /dev/null and b/spark/src/.DS_Store differ http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/.DS_Store ---------------------------------------------------------------------- diff --git a/spark/src/main/.DS_Store b/spark/src/main/.DS_Store new file mode 100644 index 0000000..7ac63ad Binary files /dev/null and b/spark/src/main/.DS_Store differ http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/.DS_Store ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/.DS_Store b/spark/src/main/scala/.DS_Store new file mode 100644 index 0000000..e6dc460 Binary files /dev/null and b/spark/src/main/scala/.DS_Store differ http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/.DS_Store ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/.DS_Store b/spark/src/main/scala/org/.DS_Store new file mode 100644 index 0000000..d6999d3 Binary files /dev/null and b/spark/src/main/scala/org/.DS_Store differ http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/apache/.DS_Store ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/.DS_Store b/spark/src/main/scala/org/apache/.DS_Store new file mode 100644 index 0000000..4ba4fae Binary files /dev/null and b/spark/src/main/scala/org/apache/.DS_Store differ http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala new file mode 100644 index 0000000..5df329b --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +// import scala.collection.parallel.mutable +import org.apache.mahout.common.RandomUtils + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2 } + + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val disributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions).checkpoint() + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colSums) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colSums) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int,Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { case (otherThing, llrScore) => llrBlock(index, otherThing) = llrScore } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anormalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.colSums) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, ::) + val numInteractionsOfUser = interactionsOfUser.sum + + val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser + + interactionsOfUser.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(userIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala new file mode 100644 index 0000000..c640e1e --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.examples + +import scala.io.Source +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + +import org.apache.mahout.cf.CooccurrenceAnalysis._ +import scala.collection.JavaConversions._ + +/** + * The Epinions dataset contains ratings from users to items and a trust-network between the users. + * We use co-occurrence analysis to compute "users who like these items, also like that items" and + * "users who trust these users, like that items" + * + * Download and unpack the dataset files from: + * + * http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 + * http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2 + **/ +object RunCrossCooccurrenceAnalysisOnEpinions { + + def main(args: Array[String]): Unit = { + + if (args.length == 0) { + println("Usage: RunCooccurrenceAnalysisOnMovielens1M <path-to-dataset-folder>") + println("Download the dataset from http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 and") + println("http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2") + sys.exit(-1) + } + + val datasetDir = args(0) + + val epinionsRatings = new SparseMatrix(49290, 139738) + + var firstLineSkipped = false + for (line <- Source.fromFile(datasetDir + "/ratings_data.txt").getLines()) { + if (line.contains(' ') && firstLineSkipped) { + val tokens = line.split(' ') + val userID = tokens(0).toInt - 1 + val itemID = tokens(1).toInt - 1 + val rating = tokens(2).toDouble + epinionsRatings(userID, itemID) = rating + } + firstLineSkipped = true + } + + val epinionsTrustNetwork = new SparseMatrix(49290, 49290) + firstLineSkipped = false + for (line <- Source.fromFile(datasetDir + "/trust_data.txt").getLines()) { + if (line.contains(' ') && firstLineSkipped) { + val tokens = line.trim.split(' ') + val userID = tokens(0).toInt - 1 + val trustedUserId = tokens(1).toInt - 1 + epinionsTrustNetwork(userID, trustedUserId) = 1 + } + firstLineSkipped = true + } + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + +// implicit val distributedContext = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", +// customJars = Traversable.empty[String]) + implicit val distributedContext = mahoutSparkContext(masterUrl = "spark://occam4:7077", appName = "MahoutClusteredContext", + customJars = Traversable.empty[String]) + + val drmEpinionsRatings = drmParallelize(epinionsRatings, numPartitions = 2) + val drmEpinionsTrustNetwork = drmParallelize(epinionsTrustNetwork, numPartitions = 2) + + val indicatorMatrices = cooccurrences(drmEpinionsRatings, randomSeed = 0xdeadbeef, + maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmEpinionsTrustNetwork)) +//hdfs://occam4:54310/user/pat/xrsj/ +/* + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), + "/tmp/co-occurrence-on-epinions/indicators-item-item/") + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), + "/tmp/co-occurrence-on-epinions/indicators-trust-item/") +*/ + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), + "hdfs://occam4:54310/user/pat/xrsj/indicators-item-item/") + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), + "hdfs://occam4:54310/user/pat/xrsj/indicators-trust-item/") + + distributedContext.close() + + println("Saved indicators to /tmp/co-occurrence-on-epinions/") + } +} + +/** + * The movielens1M dataset contains movie ratings, we use co-occurrence analysis to compute + * "users who like these movies, also like that movies" + * + * Download and unpack the dataset files from: + * http://files.grouplens.org/datasets/movielens/ml-1m.zip + */ +object RunCooccurrenceAnalysisOnMovielens1M { + + def main(args: Array[String]): Unit = { + + if (args.length == 0) { + println("Usage: RunCooccurrenceAnalysisOnMovielens1M <path-to-dataset-folder>") + println("Download the dataset from http://files.grouplens.org/datasets/movielens/ml-1m.zip") + sys.exit(-1) + } + + val datasetDir = args(0) + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + + implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + customJars = Traversable.empty[String]) + + System.setProperty("mahout.math.AtA.maxInMemNCol", 4000.toString) + + val movielens = new SparseMatrix(6040, 3952) + + for (line <- Source.fromFile(datasetDir + "/ratings.dat").getLines()) { + val tokens = line.split("::") + val userID = tokens(0).toInt - 1 + val itemID = tokens(1).toInt - 1 + val rating = tokens(2).toDouble + movielens(userID, itemID) = rating + } + + val drmMovielens = drmParallelize(movielens, numPartitions = 2) + + val indicatorMatrix = cooccurrences(drmMovielens).head + + RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrix, + "/tmp/co-occurrence-on-movielens/indicators-item-item/") + + sc.stop() + + println("Saved indicators to /tmp/co-occurrence-on-movielens/") + } +} + +object RecommendationExamplesHelper { + + def saveIndicatorMatrix(indicatorMatrix: DrmLike[Int], path: String) = { + indicatorMatrix.rdd.flatMap({ case (thingID, itemVector) => + for (elem <- itemVector.nonZeroes()) yield { thingID + '\t' + elem.index } + }) + .saveAsTextFile(path) + } +}
