MAHOUT-1883 closes no PR, adds dataset filtering for minimal needed to do cross-occurrence
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/1f5e36f2 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/1f5e36f2 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/1f5e36f2 Branch: refs/heads/master Commit: 1f5e36f249aabc68495ec15f64f5ed6754d9f1e2 Parents: c9ee728 Author: pferrel <[email protected]> Authored: Tue Oct 11 08:10:31 2016 -0700 Committer: pferrel <[email protected]> Committed: Tue Oct 11 08:10:31 2016 -0700 ---------------------------------------------------------------------- buildtools/pom.xml | 2 +- distribution/pom.xml | 2 +- examples/pom.xml | 2 +- flink/pom.xml | 2 +- h2o/pom.xml | 2 +- hdfs/pom.xml | 2 +- integration/pom.xml | 2 +- math-scala/pom.xml | 2 +- math/pom.xml | 2 +- mr/pom.xml | 2 +- pom.xml | 2 +- spark-shell/pom.xml | 2 +- spark/pom.xml | 2 +- .../indexeddataset/IndexedDatasetSpark.scala | 38 +++++++++----------- .../mahout/cf/SimilarityAnalysisSuite.scala | 20 +++++++++++ 15 files changed, 50 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/buildtools/pom.xml ---------------------------------------------------------------------- diff --git a/buildtools/pom.xml b/buildtools/pom.xml index 136d13c..c96b3a5 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -29,7 +29,7 @@ <groupId>org.apache.mahout</groupId> <artifactId>mahout-buildtools</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <name>Mahout Build Tools</name> <packaging>jar</packaging> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/pom.xml b/distribution/pom.xml index 46bfedf..536c76f 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -20,7 +20,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>apache-mahout-distribution</artifactId> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index a7838b7..b3bf827 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 8a6ae55..7857210 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/h2o/pom.xml ---------------------------------------------------------------------- diff --git a/h2o/pom.xml b/h2o/pom.xml index 1ad7779..f5095bb 100644 --- a/h2o/pom.xml +++ b/h2o/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hdfs/pom.xml b/hdfs/pom.xml index 64e1b32..50fe3b7 100644 --- a/hdfs/pom.xml +++ b/hdfs/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml index ffa1adc..d9945d9 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/math-scala/pom.xml ---------------------------------------------------------------------- diff --git a/math-scala/pom.xml b/math-scala/pom.xml index f40bdc8..9eb7e80 100644 --- a/math-scala/pom.xml +++ b/math-scala/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/math/pom.xml ---------------------------------------------------------------------- diff --git a/math/pom.xml b/math/pom.xml index 2f5752d..f0ddff1 100644 --- a/math/pom.xml +++ b/math/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/mr/pom.xml ---------------------------------------------------------------------- diff --git a/mr/pom.xml b/mr/pom.xml index 00119c9..d79c008 100644 --- a/mr/pom.xml +++ b/mr/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0120a8a..9af14ac 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <packaging>pom</packaging> <name>Apache Mahout</name> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark-shell/pom.xml ---------------------------------------------------------------------- diff --git a/spark-shell/pom.xml b/spark-shell/pom.xml index 878e70d..732c39b 100644 --- a/spark-shell/pom.xml +++ b/spark-shell/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 94a73b3..5fc9863 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.mahout</groupId> <artifactId>mahout</artifactId> - <version>0.12.3-SNAPSHOT</version> + <version>0.13.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala index e7111a8..0249d9b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala @@ -30,8 +30,8 @@ import org.apache.spark.SparkContext._ /** * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific * dfsWrite method - * - * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap + * + * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap * @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs * @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs */ @@ -54,8 +54,9 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio new IndexedDatasetSpark(matrix, rowIDs, columnIDs) } - /** Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and - * replace the writer to change how it is written. + /** + * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and + * replace the writer to change how it is written. */ override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema) (implicit sc: DistributedContext): @@ -65,16 +66,21 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictio } } -/** This is a companion object used to build an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] - * The most important odditiy is that it takes a BiDictionary of row-ids optionally. If provided no row with another - * id will be added to the dataset. This is useful for cooccurrence type calculations where all arrays must have - * the same rows and there is some record of which rows are important. +/** + * This is a companion object used to build an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] + * The most important odditiy is that it takes a BiDictionary of row-ids optionally. If provided no row with another + * id will be added to the dataset. This is useful for cooccurrence type calculations where all arrays must have + * the same rows and there is some record of which rows are important. */ object IndexedDatasetSpark { def apply(elements: RDD[(String, String)], existingRowIDs: Option[BiDictionary] = None)(implicit sc: SparkContext) = { + // todo: a further optimization is to return any broadcast dictionaries so they can be passed in and + // do not get broadcast again. At present there may be duplicate broadcasts. - // broadcast the existing dictionary or create a new one, also filter by the existing dictionary or use all elements + // create separate collections of rowID and columnID tokens + // use the dictionary passed in or create one from the element ids + // broadcast the correct row id BiDictionary val (filteredElements, rowIDDictionary_bcast, rowIDDictionary) = if (existingRowIDs.isEmpty) { val newRowIDDictionary = new BiDictionary(elements.map { case (rowID, _) => rowID }.distinct().collect()) val newRowIDDictionary_bcast = sc.broadcast(newRowIDDictionary) @@ -87,20 +93,10 @@ object IndexedDatasetSpark { (elementsRDD, existingRowIDDictionary_bcast, existingRowIDs.get) } - // create separate collections of rowID and columnID tokens - // use the dictionary passed in or create one from the element ids - // val rowIDs = filteredElements.map { case (rowID, _) => rowID }.distinct().collect() + // column ids are always taken from the RDD passed in + // todo: an optimization it to pass in a dictionary or column ids if it is the same as an existing one val columnIDs = filteredElements.map { case (_, columnID) => columnID }.distinct().collect() - // create BiDictionary(s) for bi-directional lookup of ID by either Mahout ID or external ID - // broadcast them for access in distributed processes, so they are not recalculated in every task. - //val rowIDDictionary = BiDictionary.append(existingRowIDs, rowIDs) - //val rowIDDictionary = if (existingRowIDs.isEmpty) - // case Some(d) => d - // case None => new BiDictionary(filteredElements.map { case (rowID, _) => rowID }.distinct().collect()) - // } - //val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary) - val columnIDDictionary = new BiDictionary(keys = columnIDs) val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary) http://git-wip-us.apache.org/repos/asf/mahout/blob/1f5e36f2/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala index 945b443..2d74f7d 100644 --- a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala @@ -301,6 +301,26 @@ class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with Distributed assert(aID.matrix.nrow == 4) assert(bID.matrix.nrow == 4) + assert(aID.rowIDs.contains("u1") && + aID.rowIDs.contains("u2") && + aID.rowIDs.contains("u3") && + aID.rowIDs.contains("u4")) + + assert(aID.columnIDs.contains("a1") && + aID.columnIDs.contains("a2") && + aID.columnIDs.contains("a3") && + aID.columnIDs.contains("a4") && + aID.columnIDs.contains("a5")) + + assert(bID.rowIDs.contains("u1") && + bID.rowIDs.contains("u2") && + bID.rowIDs.contains("u3") && + bID.rowIDs.contains("u4")) + assert(bID.columnIDs.contains("b1") && + bID.columnIDs.contains("b2") && + bID.columnIDs.contains("b3") && + bID.columnIDs.contains("b4")) + assert(!bID.rowIDs.contains("u5"))// this row id should be filtered out of the drm and dictionary assert(!bID.columnIDs.contains("b25"))// this row id should be filtered out of the drm and dictionary
