Repository: mahout
Updated Branches:
  refs/heads/mahout-1541 [created] 8a4b4347d


added Sebastian's CooccurrenceAnalysis patch updated it to use current 
Mahout-DSL


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/107a0ba9
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/107a0ba9
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/107a0ba9

Branch: refs/heads/mahout-1541
Commit: 107a0ba9605241653a85b113661a8fa5c055529f
Parents: 127bd01
Author: pferrel <[email protected]>
Authored: Wed Jun 4 12:54:22 2014 -0700
Committer: pferrel <[email protected]>
Committed: Wed Jun 4 12:54:22 2014 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 spark/src/.DS_Store                             | Bin 0 -> 6148 bytes
 spark/src/main/.DS_Store                        | Bin 0 -> 6148 bytes
 spark/src/main/scala/.DS_Store                  | Bin 0 -> 6148 bytes
 spark/src/main/scala/org/.DS_Store              | Bin 0 -> 6148 bytes
 spark/src/main/scala/org/apache/.DS_Store       | Bin 0 -> 6148 bytes
 .../apache/mahout/cf/CooccurrenceAnalysis.scala | 210 +++++++++++++++++++
 .../mahout/cf/examples/Recommendations.scala    | 169 +++++++++++++++
 8 files changed, 380 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c47bff1..f500375 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@ output-asf-email-examples/
 .project
 .settings/
 .idea/
+.DS_Store
 *.iml
 target/
 examples/bin/tmp

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/.DS_Store
----------------------------------------------------------------------
diff --git a/spark/src/.DS_Store b/spark/src/.DS_Store
new file mode 100644
index 0000000..7b0d367
Binary files /dev/null and b/spark/src/.DS_Store differ

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/.DS_Store
----------------------------------------------------------------------
diff --git a/spark/src/main/.DS_Store b/spark/src/main/.DS_Store
new file mode 100644
index 0000000..7ac63ad
Binary files /dev/null and b/spark/src/main/.DS_Store differ

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/.DS_Store
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/.DS_Store b/spark/src/main/scala/.DS_Store
new file mode 100644
index 0000000..e6dc460
Binary files /dev/null and b/spark/src/main/scala/.DS_Store differ

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/.DS_Store
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/.DS_Store 
b/spark/src/main/scala/org/.DS_Store
new file mode 100644
index 0000000..d6999d3
Binary files /dev/null and b/spark/src/main/scala/org/.DS_Store differ

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/apache/.DS_Store
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/.DS_Store 
b/spark/src/main/scala/org/apache/.DS_Store
new file mode 100644
index 0000000..4ba4fae
Binary files /dev/null and b/spark/src/main/scala/org/apache/.DS_Store differ

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala 
b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
new file mode 100644
index 0000000..5df329b
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+// import scala.collection.parallel.mutable
+import org.apache.mahout.common.RandomUtils
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, 
Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+  /** Compares (Int,Double) pairs by the second value */
+  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, 
score1), (_, score2)) => score1 > score2 }
+
+  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, 
maxInterestingItemsPerThing: Int = 50,
+                    maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] 
= Array()): List[DrmLike[Int]] = {
+
+    implicit val disributedContext = drmARaw.context
+
+    // Apply selective downsampling, pin resulting matrix
+    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, 
maxNumInteractions).checkpoint()
+
+    // num users, which equals the maximum number of interactions per item
+    val numUsers = drmA.nrow.toInt
+
+    // Compute & broadcast the number of interactions per thing in A
+    val bcastInteractionsPerItemA = drmBroadcast(drmA.colSums)
+
+    // Compute co-occurrence matrix A'A
+    val drmAtA = drmA.t %*% drmA
+
+    // Compute loglikelihood scores and sparsify the resulting matrix to get 
the indicator matrix
+    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, 
maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+                                             bcastInteractionsPerItemA, 
crossCooccurrence = false)
+
+    var indicatorMatrices = List(drmIndicatorsAtA)
+
+    // Now look at cross-co-occurrences
+    for (drmBRaw <- drmBs) {
+      // Down-sample and pin other interaction matrix
+      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, 
maxNumInteractions).checkpoint()
+
+      // Compute & broadcast the number of interactions per thing in B
+      val bcastInteractionsPerThingB = drmBroadcast(drmB.colSums)
+
+      // Compute cross-co-occurrence matrix B'A
+      val drmBtA = drmB.t %*% drmA
+      
+      val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, 
maxInterestingItemsPerThing,
+                                               bcastInteractionsPerThingB, 
bcastInteractionsPerItemA)
+
+      indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
+
+      drmB.uncache()
+    }
+
+    // Unpin downsampled interaction matrix
+    drmA.uncache()
+
+    // Return list of indicator matrices
+    indicatorMatrices
+  }
+
+  /**
+   * Compute loglikelihood ratio
+   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for 
details
+   **/
+  def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: 
Long,
+                         numInteractionsWithAandB: Long, numInteractions: 
Long) = {
+
+    val k11 = numInteractionsWithAandB
+    val k12 = numInteractionsWithA - numInteractionsWithAandB
+    val k21 = numInteractionsWithB - numInteractionsWithAandB
+    val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + 
numInteractionsWithAandB
+
+    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+  }
+
+  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, 
maxInterestingItemsPerThing: Int,
+      bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: 
BCast[Vector],
+      crossCooccurrence: Boolean = true) = {
+    drmBtA.mapBlock() {
+      case (keys, block) =>
+
+        val llrBlock = block.like()
+        val numInteractionsB: Vector = bcastNumInteractionsB
+        val numInteractionsA: Vector = bcastNumInteractionsA
+
+        for (index <- 0 until keys.size) {
+
+          val thingB = keys(index)
+
+          // PriorityQueue to select the top-k items
+          val topItemsPerThing = new 
mutable.PriorityQueue[(Int,Double)]()(orderByScore)
+
+          block(index, ::).nonZeroes().foreach { elem =>
+            val thingA = elem.index
+            val cooccurrences = elem.get
+
+            // exclude co-occurrences of the item with itself
+            if (crossCooccurrence || thingB != thingA) {
+              // Compute loglikelihood ratio
+              val llrRatio = 
loglikelihoodRatio(numInteractionsB(thingB).toLong, 
numInteractionsA(thingA).toLong,
+                                                cooccurrences.toLong, numUsers)
+              val candidate = thingA -> llrRatio
+
+              // Enqueue item with score, if belonging to the top-k
+              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+                topItemsPerThing.enqueue(candidate)
+              } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
+                topItemsPerThing.dequeue()
+                topItemsPerThing.enqueue(candidate)
+              }
+            }
+          }
+
+          // Add top-k interesting items to the output matrix
+          topItemsPerThing.dequeueAll.foreach { case (otherThing, llrScore) => 
llrBlock(index, otherThing) = llrScore }
+        }
+
+        keys -> llrBlock
+    }
+  }
+
+  /**
+   * Selectively downsample users and things with an anormalous amount of 
interactions, inspired by
+   * 
https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+   *
+   * additionally binarizes input matrix, as we're only interesting in knowing 
whether interactions happened or not
+   */
+  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: 
Int) = {
+
+    implicit val distributedContext = drmM.context
+
+    // Pin raw interaction matrix
+    val drmI = drmM.checkpoint()
+
+    // Broadcast vector containing the number of interactions with each thing
+    val bcastNumInteractions = drmBroadcast(drmI.colSums)
+
+    val downSampledDrmI = drmI.mapBlock() {
+      case (keys, block) =>
+        val numInteractions: Vector = bcastNumInteractions
+
+        // Use a hash of the unique first key to seed the RNG, makes this 
computation repeatable in case of failures
+        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+        val downsampledBlock = block.like()
+
+        // Downsample the interaction vector of each user
+        for (userIndex <- 0 until keys.size) {
+
+          val interactionsOfUser = block(userIndex, ::)
+          val numInteractionsOfUser = interactionsOfUser.sum
+
+          val perUserSampleRate = math.min(maxNumInteractions, 
numInteractionsOfUser) / numInteractionsOfUser
+
+          interactionsOfUser.nonZeroes().foreach { elem =>
+            val numInteractionsWithThing = numInteractions(elem.index)
+            val perThingSampleRate = math.min(maxNumInteractions, 
numInteractionsWithThing) / numInteractionsWithThing
+
+            if (random.nextDouble() <= math.min(perUserSampleRate, 
perThingSampleRate)) {
+              // We ignore the original interaction value and create a binary 
0-1 matrix
+              // as we only consider whether interactions happened or did not 
happen
+              downsampledBlock(userIndex, elem.index) = 1
+            }
+          }
+      }
+
+      keys -> downsampledBlock
+    }
+
+    // Unpin raw interaction matrix
+    drmI.uncache()
+
+    downSampledDrmI
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/107a0ba9/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala 
b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala
new file mode 100644
index 0000000..c640e1e
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/cf/examples/Recommendations.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.examples
+
+import scala.io.Source
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+
+import org.apache.mahout.cf.CooccurrenceAnalysis._
+import scala.collection.JavaConversions._
+
+/**
+ * The Epinions dataset contains ratings from users to items and a 
trust-network between the users.
+ * We use co-occurrence analysis to compute "users who like these items, also 
like that items" and
+ * "users who trust these users, like that items"
+ *
+ * Download and unpack the dataset files from:
+ *
+ * http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2
+ * http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2
+ **/
+object RunCrossCooccurrenceAnalysisOnEpinions {
+
+  def main(args: Array[String]): Unit = {
+
+    if (args.length == 0) {
+      println("Usage: RunCooccurrenceAnalysisOnMovielens1M 
<path-to-dataset-folder>")
+      println("Download the dataset from 
http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 and")
+      
println("http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2";)
+      sys.exit(-1)
+    }
+
+    val datasetDir = args(0)
+
+    val epinionsRatings = new SparseMatrix(49290, 139738)
+
+    var firstLineSkipped = false
+    for (line <- Source.fromFile(datasetDir + "/ratings_data.txt").getLines()) 
{
+      if (line.contains(' ') && firstLineSkipped) {
+        val tokens = line.split(' ')
+        val userID = tokens(0).toInt - 1
+        val itemID = tokens(1).toInt - 1
+        val rating = tokens(2).toDouble
+        epinionsRatings(userID, itemID) = rating
+      }
+      firstLineSkipped = true
+    }
+
+    val epinionsTrustNetwork = new SparseMatrix(49290, 49290)
+    firstLineSkipped = false
+    for (line <- Source.fromFile(datasetDir + "/trust_data.txt").getLines()) {
+      if (line.contains(' ') && firstLineSkipped) {
+        val tokens = line.trim.split(' ')
+        val userID = tokens(0).toInt - 1
+        val trustedUserId = tokens(1).toInt - 1
+        epinionsTrustNetwork(userID, trustedUserId) = 1
+      }
+      firstLineSkipped = true
+    }
+
+    System.setProperty("spark.kryo.referenceTracking", "false")
+    System.setProperty("spark.kryoserializer.buffer.mb", "100")
+
+//    implicit val distributedContext = mahoutSparkContext(masterUrl = 
"local", appName = "MahoutLocalContext",
+//      customJars = Traversable.empty[String])
+    implicit val distributedContext = mahoutSparkContext(masterUrl = 
"spark://occam4:7077", appName = "MahoutClusteredContext",
+      customJars = Traversable.empty[String])
+
+    val drmEpinionsRatings = drmParallelize(epinionsRatings, numPartitions = 2)
+    val drmEpinionsTrustNetwork = drmParallelize(epinionsTrustNetwork, 
numPartitions = 2)
+
+    val indicatorMatrices = cooccurrences(drmEpinionsRatings, randomSeed = 
0xdeadbeef,
+        maxInterestingItemsPerThing = 100, maxNumInteractions = 500, 
Array(drmEpinionsTrustNetwork))
+//hdfs://occam4:54310/user/pat/xrsj/
+/*
+    RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0),
+        "/tmp/co-occurrence-on-epinions/indicators-item-item/")
+    RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1),
+        "/tmp/co-occurrence-on-epinions/indicators-trust-item/")
+*/
+    RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0),
+      "hdfs://occam4:54310/user/pat/xrsj/indicators-item-item/")
+    RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1),
+      "hdfs://occam4:54310/user/pat/xrsj/indicators-trust-item/")
+
+    distributedContext.close()
+
+    println("Saved indicators to /tmp/co-occurrence-on-epinions/")
+  }
+}
+
+/**
+ * The movielens1M dataset contains movie ratings, we use co-occurrence 
analysis to compute
+ * "users who like these movies, also like that movies"
+ *
+ * Download and unpack the dataset files from:
+ * http://files.grouplens.org/datasets/movielens/ml-1m.zip
+ */
+object RunCooccurrenceAnalysisOnMovielens1M {
+
+  def main(args: Array[String]): Unit = {
+
+    if (args.length == 0) {
+      println("Usage: RunCooccurrenceAnalysisOnMovielens1M 
<path-to-dataset-folder>")
+      println("Download the dataset from 
http://files.grouplens.org/datasets/movielens/ml-1m.zip";)
+      sys.exit(-1)
+    }
+
+    val datasetDir = args(0)
+
+    System.setProperty("spark.kryo.referenceTracking", "false")
+    System.setProperty("spark.kryoserializer.buffer.mb", "100")
+
+    implicit val sc = mahoutSparkContext(masterUrl = "local", appName = 
"MahoutLocalContext",
+      customJars = Traversable.empty[String])
+
+    System.setProperty("mahout.math.AtA.maxInMemNCol", 4000.toString)
+
+    val movielens = new SparseMatrix(6040, 3952)
+
+    for (line <- Source.fromFile(datasetDir + "/ratings.dat").getLines()) {
+      val tokens = line.split("::")
+      val userID = tokens(0).toInt - 1
+      val itemID = tokens(1).toInt - 1
+      val rating = tokens(2).toDouble
+      movielens(userID, itemID) = rating
+    }
+
+    val drmMovielens = drmParallelize(movielens, numPartitions = 2)
+
+    val indicatorMatrix = cooccurrences(drmMovielens).head
+
+    RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrix,
+        "/tmp/co-occurrence-on-movielens/indicators-item-item/")
+
+    sc.stop()
+
+    println("Saved indicators to /tmp/co-occurrence-on-movielens/")
+  }
+}
+
+object RecommendationExamplesHelper {
+
+  def saveIndicatorMatrix(indicatorMatrix: DrmLike[Int], path: String) = {
+    indicatorMatrix.rdd.flatMap({ case (thingID, itemVector) =>
+        for (elem <- itemVector.nonZeroes()) yield { thingID + '\t' + 
elem.index }
+      })
+      .saveAsTextFile(path)
+  }
+}

Reply via email to