Repository: mahout
Updated Branches:
  refs/heads/master 20fdf9b9f -> 1f5e36f24


added tests for new CCO data filtering to the minimum subset needed


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/220c4749
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/220c4749
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/220c4749

Branch: refs/heads/master
Commit: 220c47493d30c0928e116af9210ab1786068ab13
Parents: b5fe4aa
Author: pferrel <p...@occamsmachete.com>
Authored: Sat Oct 1 14:02:50 2016 -0700
Committer: pferrel <p...@occamsmachete.com>
Committed: Sat Oct 1 14:02:50 2016 -0700

----------------------------------------------------------------------
 .../indexeddataset/IndexedDatasetSpark.scala    | 45 +++++++++++-----
 .../mahout/cf/SimilarityAnalysisSuite.scala     | 55 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/220c4749/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index 727a95e..e7111a8 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -30,7 +30,8 @@ import org.apache.spark.SparkContext._
 /**
  * Spark implementation of 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark 
specific
  * dfsWrite method
- * @param matrix a 
[[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
+  *
+  * @param matrix a 
[[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
  * @param rowIDs a bidirectional map for Mahout Int IDs to/from application 
specific string IDs
  * @param columnIDs a bidirectional map for Mahout Int IDs to/from application 
specific string IDs
  */
@@ -53,9 +54,8 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], 
val rowIDs: BiDictio
     new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
   }
 
-  /**
-   * Implements the core method to write 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
-   * replace the writer to change how it is written.
+  /** Implements the core method to write 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
+   *  replace the writer to change how it is written.
    */
   override def dfsWrite(dest: String, schema: Schema = 
DefaultIndexedDatasetWriteSchema)
       (implicit sc: DistributedContext):
@@ -65,31 +65,50 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], 
val rowIDs: BiDictio
   }
 }
 
+/** This is a companion object used to build an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+ *  The most important odditiy is that it takes a BiDictionary of row-ids 
optionally. If provided no row with another
+ *  id will be added to the dataset. This is useful for cooccurrence type 
calculations where all arrays must have
+ *  the same rows and there is some record of which rows are important.
+ */
 object IndexedDatasetSpark {
   
   def apply(elements: RDD[(String, String)], existingRowIDs: 
Option[BiDictionary] = None)(implicit sc: SparkContext) = {
 
+    // broadcast the existing dictionary or create a new one, also filter by 
the existing dictionary or use all elements
+    val (filteredElements, rowIDDictionary_bcast, rowIDDictionary) = if 
(existingRowIDs.isEmpty) {
+      val newRowIDDictionary = new BiDictionary(elements.map { case (rowID, _) 
=> rowID }.distinct().collect())
+      val newRowIDDictionary_bcast = sc.broadcast(newRowIDDictionary)
+      (elements, newRowIDDictionary_bcast, newRowIDDictionary)
+    } else {
+      val existingRowIDDictionary_bcast = sc.broadcast(existingRowIDs.get)
+      val elementsRDD = elements.filter{ case (rowID, _) =>
+        existingRowIDDictionary_bcast.value.contains(rowID)
+      }
+      (elementsRDD, existingRowIDDictionary_bcast, existingRowIDs.get)
+    }
+
     // create separate collections of rowID and columnID tokens
-    val rowIDs = elements.map { case (rowID, _) => rowID }.distinct().collect()
-    val columnIDs = elements.map { case (_, columnID) => columnID 
}.distinct().collect()
+    // use the dictionary passed in or create one from the element ids
+    // val rowIDs = filteredElements.map { case (rowID, _) => rowID 
}.distinct().collect()
+    val columnIDs = filteredElements.map { case (_, columnID) => columnID 
}.distinct().collect()
 
     // create BiDictionary(s) for bi-directional lookup of ID by either Mahout 
ID or external ID
     // broadcast them for access in distributed processes, so they are not 
recalculated in every task.
     //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs)
-    val rowIDDictionary = existingRowIDs match {
-      case Some(d) => d.merge(rowIDs)
-      case None =>  new BiDictionary(rowIDs)
-    }
-    val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
+    //val rowIDDictionary = if (existingRowIDs.isEmpty)
+    //  case Some(d) => d
+    //  case None =>  new BiDictionary(filteredElements.map { case (rowID, _) 
=> rowID }.distinct().collect())
+    // }
+    //val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
 
     val columnIDDictionary = new BiDictionary(keys = columnIDs)
     val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary)
 
     val ncol = columnIDDictionary.size
-    val nrow = rowIDDictionary.size
+    //val nrow = rowIDDictionary.size
 
     val indexedInteractions =
-      elements.map { case (rowID, columnID) =>
+      filteredElements.map { case (rowID, columnID) =>
         val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1)
         val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, 
-1)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/220c4749/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala 
b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
index 63e0df7..945b443 100644
--- a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala
@@ -21,9 +21,13 @@ import 
org.apache.mahout.math.cf.{DownsamplableCrossOccurrenceDataset, Similarit
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.indexeddataset.BiDictionary
 import org.apache.mahout.math.scalabindings.{MatrixOps, _}
+import org.apache.mahout.sparkbindings.SparkDistributedContext
+import org.apache.mahout.sparkbindings.dc2sc
 import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
 import org.apache.mahout.test.MahoutSuite
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
 import org.scalatest.FunSuite
 
 /* values 
@@ -251,6 +255,57 @@ class SimilarityAnalysisSuite extends FunSuite with 
MahoutSuite with Distributed
     n should be < 1E-10
   }
 
+  test("Cross-occurrence two IndexedDatasets different row ranks"){
+
+    val sc = dc2sc(mahoutCtx)
+
+
+    /*
+    val a = dense(
+      "u1"(1, 1, 0, 0, 0),
+      "u2"(0, 0, 1, 1, 0),
+      "u3"(0, 0, 0, 0, 1),
+      "u4"(1, 0, 0, 1, 0))
+
+    val b = dense(
+      "u1"(0, 1, 1, 0),
+      "u2"(1, 1, 1, 0),
+      "u3"(0, 0, 1, 0),
+      "u4"(1, 1, 0, 1)
+      "u5"(1, 1, 1, 1))
+*/
+    val pairsA = Seq(
+      ("u1","a1"), ("u1","a2"),
+      ("u2","a3"), ("u2","a4"),
+      ("u3","a5"),
+      ("u4","a1"), ("u4","a4"))
+
+    val pairsB = Seq(
+      ("u1","b2"), ("u1","b3"),
+      ("u2","b1"), ("u2","b2"), ("u2","b3"),
+      ("u3","b2"),
+      ("u4","b1"), ("u4","b2"), ("u4","b4"),
+      ("u5","b1"), ("u5", "b25"))
+
+
+
+    val pairRDDA = sc.parallelize(pairsA, 4)
+    val pairRDDB = sc.parallelize(pairsB, 4)
+
+    val aID = IndexedDatasetSpark(pairRDDA)(sc)
+    val bID = IndexedDatasetSpark(pairRDDB, Some(aID.rowIDs))(sc)
+
+    assert(aID.rowIDs.size == 4)
+    assert(bID.rowIDs.size == 4)
+
+    assert(aID.matrix.nrow == 4)
+    assert(bID.matrix.nrow == 4)
+
+    assert(!bID.rowIDs.contains("u5"))// this row id should be filtered out of 
the drm and dictionary
+    assert(!bID.columnIDs.contains("b25"))// this row id should be filtered 
out of the drm and dictionary
+
+  }
+
   test("Cross-occurrence two IndexedDatasets LLR threshold"){
     val a = dense(
       (1, 1, 0, 0, 0),

Reply via email to