Repository: mahout Updated Branches: refs/heads/master bbb90c21d -> c943c37e0
NOJIRA minor row similarity cleanup and better partitioning for faster cooccurrence Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c943c37e Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c943c37e Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c943c37e Branch: refs/heads/master Commit: c943c37e0cc9695700774031c43d4421ba210c3b Parents: bbb90c2 Author: pferrel <[email protected]> Authored: Sat Jul 18 17:17:04 2015 -0700 Committer: pferrel <[email protected]> Committed: Sat Jul 18 17:17:04 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../mahout/drivers/MahoutOptionParser.scala | 4 +-- .../mahout/math/cf/SimilarityAnalysis.scala | 9 +++++++ .../mahout/drivers/ItemSimilarityDriver.scala | 2 +- .../mahout/drivers/RowSimilarityDriver.scala | 12 ++++----- .../drivers/TextDelimitedReaderWriter.scala | 26 +++++++++++--------- .../drivers/RowSimilarityDriverSuite.scala | 11 +++++---- 7 files changed, 41 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index c097121..997fc5b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 0.11.0 - unreleased + NOJIRA: Clean up CLI help for spark-rowsimilarity and fixed test that intermitently failed (pferrel) + MAHOUT-1685: Move Mahout shell to Spark 1.3+ (dlyubimov, apalumbo) MAHOUT-1653: Spark 1.3 (pferrel, apalumbo) http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala index 3b5affd..d3723a2 100644 --- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -151,9 +151,9 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } - def parseIndexedDatasetFormatOptions() = { + def parseIndexedDatasetFormatOptions(notice: String = "\nOutput text file schema options:") = { opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions - note("\nOutput text file schema options:") + note(notice) opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => options + ("rowKeyDelim" -> x) } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala index fd91c16..4632468 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala @@ -59,6 +59,9 @@ object SimilarityAnalysis extends Serializable { implicit val distributedContext = drmARaw.context + // backend allowed to optimize partitioning + drmARaw.par(auto = true) + // Apply selective downsampling, pin resulting matrix val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) @@ -79,6 +82,9 @@ object SimilarityAnalysis extends Serializable { // Now look at cross cooccurrences for (drmBRaw <- drmBs) { + // backend allowed to optimize partitioning + drmBRaw.par(auto = true) + // Down-sample and pin other interaction matrix val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() @@ -153,6 +159,9 @@ object SimilarityAnalysis extends Serializable { implicit val distributedContext = drmARaw.context + // backend allowed to optimize partitioning + drmARaw.par(auto = true) + // Apply selective downsampling, pin resulting matrix val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index d7f2787..f87624b 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -59,7 +59,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver { override def main(args: Array[String]): Unit = { parser = new MahoutSparkOptionParser(programName = "spark-itemsimilarity") { - head("spark-itemsimilarity", "Mahout 0.10.0") + head("spark-itemsimilarity", "Mahout") //Input output options, non-driver specific parseIOOptions(numInputs = 2) http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala index cfa8f99..817c6ff 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -54,7 +54,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { override def main(args: Array[String]): Unit = { parser = new MahoutSparkOptionParser(programName = "spark-rowsimilarity") { - head("spark-rowsimilarity", "Mahout 0.10.0") + head("spark-rowsimilarity", "Mahout") //Input output options, non-driver specific parseIOOptions() @@ -67,14 +67,14 @@ object RowSimilarityDriver extends MahoutSparkDriver { options + ("maxObservations" -> x) } text ("Max number of observations to consider per row (optional). Default: " + RowSimilarityOptions("maxObservations")) validate { x => - if (x > 0) success else failure("Option --maxObservations must be > 0") + if (x > 0) success else failure("Option --maxObservations must be > 0") } opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) => options + ("maxSimilaritiesPerRow" -> x) } text ("Limit the number of similarities per item to this number (optional). Default: " + RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x => - if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0") + if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0") } // --threshold not implemented in SimilarityAnalysis.rowSimilarity @@ -85,7 +85,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.") //Drm output schema--not driver specific, drm specific - parseIndexedDatasetFormatOptions() + parseIndexedDatasetFormatOptions("\nInput and Output text file schema options (same for both):") //How to search for input parseFileDiscoveryOptions() @@ -126,7 +126,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { null.asInstanceOf[IndexedDataset] } else { - val datasetA = indexedDatasetDFSRead(inFiles, readWriteSchema) + val datasetA = indexedDatasetDFSRead(src = inFiles, schema = readWriteSchema) datasetA } } @@ -141,7 +141,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int]) - rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema) + rowSimilarityIDS.dfsWrite(dest = parser.opts("output").asInstanceOf[String], schema = readWriteSchema) stop() } http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index e2a2a9a..b5f76e0 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -125,6 +125,8 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ * Read in text delimited rows from all URIs in this comma delimited source String and return * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is * no strength value in the element, assume it's presence means a strength of 1. + * Note: if the input file has a strength delimiter but none is seen in rows, we assume there is none + * and give the strength as 1 in the input DRM. * @param mc context for the Spark job * @param readSchema describes the delimiters and positions of values in the text delimited file. * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]] @@ -140,10 +142,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String] val elementDelim = readSchema("elementDelim").asInstanceOf[String] + val omitScore = readSchema("omitScore").asInstanceOf[Boolean] require (!source.isEmpty, "No file(s) to read") - - var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) } + val rows = mc.textFile(source).map { line => line.split(rowKeyDelim) } // get row and column IDs val interactions = rows.map { row => @@ -151,8 +153,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ } interactions.cache() - // forces into memory so only for debugging - //interactions.collect() // create separate collections of rowID and columnID tokens val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect() @@ -160,7 +160,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ // the columns are in a TD string so separate them and get unique ones val columnIDs = interactions.flatMap { case (_, columns) => columns val elements = columns.split(elementDelim) - val colIDs = elements.map( elem => elem.split(columnIdStrengthDelim)(0) ) + val colIDs = if (!omitScore) + elements.map( elem => elem.split(columnIdStrengthDelim)(0) ) + else + elements colIDs }.distinct().collect() @@ -186,13 +189,14 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ val elements = columns.split(elementDelim) val row = new RandomAccessSparseVector(ncol) for (element <- elements) { - val id = element.split(columnIdStrengthDelim)(0) + val id = if (omitScore) element else element.split(columnIdStrengthDelim)(0) val columnID = columnIDDictionary_bcast.value.getOrElse(id, -1) - val pair = element.split(columnIdStrengthDelim) - if (pair.size == 2) // there was a strength - row.setQuick(columnID, pair(1).toDouble) - else // no strength so set DRM value to 1.0d, this ignores 'omitScore', which is a write param - row.setQuick(columnID, 1.0d) + val strength = if (omitScore) 1.0d else {// if the input says not to omit but there is no seperator treat + // as omitting and return a strength of 1 + if (element.split(columnIdStrengthDelim).size == 1) 1.0d + else element.split(columnIdStrengthDelim)(1).toDouble + } + row.setQuick(columnID, strength) } rowIndex -> row } http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala index 562e8c6..eccddb1 100644 --- a/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala @@ -74,11 +74,12 @@ class RowSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--master", masterUrl)) val simLines = mahoutCtx.textFile(outPath).collect - for (rowNum <- 0 to 4){ - simLines(rowNum).split("[\t ]") should contain theSameElementsAs firstFiveSimDocsTokens - } - for (rowNum <- 5 to 9){ - simLines(rowNum).split("[\t ]") should contain theSameElementsAs lastFiveSimDocsTokens + simLines.foreach { line => + val lineTokens = line.split("[\t ]") + if (lineTokens.contains("doc1") ) // docs are two flavors so if only 4 similarities it will effectively classify + lineTokens should contain theSameElementsAs firstFiveSimDocsTokens + else + lineTokens should contain theSameElementsAs lastFiveSimDocsTokens } }
