Repository: mahout Updated Branches: refs/heads/master 20fdf9b9f -> 1f5e36f24
added tests for new CCO data filtering to the minimum subset needed Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/220c4749 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/220c4749 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/220c4749 Branch: refs/heads/master Commit: 220c47493d30c0928e116af9210ab1786068ab13 Parents: b5fe4aa Author: pferrel <p...@occamsmachete.com> Authored: Sat Oct 1 14:02:50 2016 -0700 Committer: pferrel <p...@occamsmachete.com> Committed: Sat Oct 1 14:02:50 2016 -0700 ---------------------------------------------------------------------- .../indexeddataset/IndexedDatasetSpark.scala | 45 +++++++++++----- .../mahout/cf/SimilarityAnalysisSuite.scala | 55 ++++++++++++++++++++ 2 files changed, 87 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/220c4749/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala index 727a95e..e7111a8 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala @@ -30,7 +30,8 @@ import org.apache.spark.SparkContext._ /** * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific * dfsWrite method - * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap + * + * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap * @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs * @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs */ @@ -53,9 +54,8 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio new IndexedDatasetSpark(matrix, rowIDs, columnIDs) } - /** - * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and - * replace the writer to change how it is written. + /** Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and + * replace the writer to change how it is written. */ override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema) (implicit sc: DistributedContext): @@ -65,31 +65,50 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio } } +/** This is a companion object used to build an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] + * The most important odditiy is that it takes a BiDictionary of row-ids optionally. If provided no row with another + * id will be added to the dataset. This is useful for cooccurrence type calculations where all arrays must have + * the same rows and there is some record of which rows are important. + */ object IndexedDatasetSpark { def apply(elements: RDD[(String, String)], existingRowIDs: Option[BiDictionary] = None)(implicit sc: SparkContext) = { + // broadcast the existing dictionary or create a new one, also filter by the existing dictionary or use all elements + val (filteredElements, rowIDDictionary_bcast, rowIDDictionary) = if (existingRowIDs.isEmpty) { + val newRowIDDictionary = new BiDictionary(elements.map { case (rowID, _) => rowID }.distinct().collect()) + val newRowIDDictionary_bcast = sc.broadcast(newRowIDDictionary) + (elements, newRowIDDictionary_bcast, newRowIDDictionary) + } else { + val existingRowIDDictionary_bcast = sc.broadcast(existingRowIDs.get) + val elementsRDD = elements.filter{ case (rowID, _) => + existingRowIDDictionary_bcast.value.contains(rowID) + } + (elementsRDD, existingRowIDDictionary_bcast, existingRowIDs.get) + } + // create separate collections of rowID and columnID tokens - val rowIDs = elements.map { case (rowID, _) => rowID }.distinct().collect() - val columnIDs = elements.map { case (_, columnID) => columnID }.distinct().collect() + // use the dictionary passed in or create one from the element ids + // val rowIDs = filteredElements.map { case (rowID, _) => rowID }.distinct().collect() + val columnIDs = filteredElements.map { case (_, columnID) => columnID }.distinct().collect() // create BiDictionary(s) for bi-directional lookup of ID by either Mahout ID or external ID // broadcast them for access in distributed processes, so they are not recalculated in every task. //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs) - val rowIDDictionary = existingRowIDs match { - case Some(d) => d.merge(rowIDs) - case None => new BiDictionary(rowIDs) - } - val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary) + //val rowIDDictionary = if (existingRowIDs.isEmpty) + // case Some(d) => d + // case None => new BiDictionary(filteredElements.map { case (rowID, _) => rowID }.distinct().collect()) + // } + //val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary) val columnIDDictionary = new BiDictionary(keys = columnIDs) val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary) val ncol = columnIDDictionary.size - val nrow = rowIDDictionary.size + //val nrow = rowIDDictionary.size val indexedInteractions = - elements.map { case (rowID, columnID) => + filteredElements.map { case (rowID, columnID) => val rowIndex = rowIDDictionary_bcast.value.getOrElse(rowID, -1) val columnIndex = columnIDDictionary_bcast.value.getOrElse(columnID, -1) http://git-wip-us.apache.org/repos/asf/mahout/blob/220c4749/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala index 63e0df7..945b443 100644 --- a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala @@ -21,9 +21,13 @@ import org.apache.mahout.math.cf.{DownsamplableCrossOccurrenceDataset, Similarit import org.apache.mahout.math.drm._ import org.apache.mahout.math.indexeddataset.BiDictionary import org.apache.mahout.math.scalabindings.{MatrixOps, _} +import org.apache.mahout.sparkbindings.SparkDistributedContext +import org.apache.mahout.sparkbindings.dc2sc import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.mahout.sparkbindings.test.DistributedSparkSuite import org.apache.mahout.test.MahoutSuite +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD import org.scalatest.FunSuite /* values @@ -251,6 +255,57 @@ class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with Distributed n should be < 1E-10 } + test("Cross-occurrence two IndexedDatasets different row ranks"){ + + val sc = dc2sc(mahoutCtx) + + + /* + val a = dense( + "u1"(1, 1, 0, 0, 0), + "u2"(0, 0, 1, 1, 0), + "u3"(0, 0, 0, 0, 1), + "u4"(1, 0, 0, 1, 0)) + + val b = dense( + "u1"(0, 1, 1, 0), + "u2"(1, 1, 1, 0), + "u3"(0, 0, 1, 0), + "u4"(1, 1, 0, 1) + "u5"(1, 1, 1, 1)) +*/ + val pairsA = Seq( + ("u1","a1"), ("u1","a2"), + ("u2","a3"), ("u2","a4"), + ("u3","a5"), + ("u4","a1"), ("u4","a4")) + + val pairsB = Seq( + ("u1","b2"), ("u1","b3"), + ("u2","b1"), ("u2","b2"), ("u2","b3"), + ("u3","b2"), + ("u4","b1"), ("u4","b2"), ("u4","b4"), + ("u5","b1"), ("u5", "b25")) + + + + val pairRDDA = sc.parallelize(pairsA, 4) + val pairRDDB = sc.parallelize(pairsB, 4) + + val aID = IndexedDatasetSpark(pairRDDA)(sc) + val bID = IndexedDatasetSpark(pairRDDB, Some(aID.rowIDs))(sc) + + assert(aID.rowIDs.size == 4) + assert(bID.rowIDs.size == 4) + + assert(aID.matrix.nrow == 4) + assert(bID.matrix.nrow == 4) + + assert(!bID.rowIDs.contains("u5"))// this row id should be filtered out of the drm and dictionary + assert(!bID.columnIDs.contains("b25"))// this row id should be filtered out of the drm and dictionary + + } + test("Cross-occurrence two IndexedDatasets LLR threshold"){ val a = dense( (1, 1, 0, 0, 0),