Repository: mahout
Updated Branches:
  refs/heads/master bbb90c21d -> c943c37e0


NOJIRA minor row similarity cleanup and better partitioning for faster 
cooccurrence


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c943c37e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c943c37e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c943c37e

Branch: refs/heads/master
Commit: c943c37e0cc9695700774031c43d4421ba210c3b
Parents: bbb90c2
Author: pferrel <[email protected]>
Authored: Sat Jul 18 17:17:04 2015 -0700
Committer: pferrel <[email protected]>
Committed: Sat Jul 18 17:17:04 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 ++
 .../mahout/drivers/MahoutOptionParser.scala     |  4 +--
 .../mahout/math/cf/SimilarityAnalysis.scala     |  9 +++++++
 .../mahout/drivers/ItemSimilarityDriver.scala   |  2 +-
 .../mahout/drivers/RowSimilarityDriver.scala    | 12 ++++-----
 .../drivers/TextDelimitedReaderWriter.scala     | 26 +++++++++++---------
 .../drivers/RowSimilarityDriverSuite.scala      | 11 +++++----
 7 files changed, 41 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c097121..997fc5b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.11.0 - unreleased
 
+  NOJIRA: Clean up CLI help for spark-rowsimilarity and fixed test that 
intermitently failed (pferrel)
+
   MAHOUT-1685: Move Mahout shell to Spark 1.3+ (dlyubimov, apalumbo)
 
   MAHOUT-1653: Spark 1.3 (pferrel, apalumbo)

http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala 
b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
index 3b5affd..d3723a2 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -151,9 +151,9 @@ class MahoutOptionParser(programName: String) extends 
OptionParser[Map[String, A
 
   }
 
-  def parseIndexedDatasetFormatOptions() = {
+  def parseIndexedDatasetFormatOptions(notice: String = "\nOutput text file 
schema options:") = {
     opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions
-    note("\nOutput text file schema options:")
+    note(notice)
     opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) =>
       options + ("rowKeyDelim" -> x)
     } text ("Separates the rowID key from the vector values list (optional). 
Default: \"\\t\"")

http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
index fd91c16..4632468 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
@@ -59,6 +59,9 @@ object SimilarityAnalysis extends Serializable {
 
     implicit val distributedContext = drmARaw.context
 
+    // backend allowed to optimize partitioning
+    drmARaw.par(auto = true)
+
     // Apply selective downsampling, pin resulting matrix
     val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
 
@@ -79,6 +82,9 @@ object SimilarityAnalysis extends Serializable {
 
     // Now look at cross cooccurrences
     for (drmBRaw <- drmBs) {
+      // backend allowed to optimize partitioning
+      drmBRaw.par(auto = true)
+
       // Down-sample and pin other interaction matrix
       val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, 
maxNumInteractions).checkpoint()
 
@@ -153,6 +159,9 @@ object SimilarityAnalysis extends Serializable {
 
     implicit val distributedContext = drmARaw.context
 
+    // backend allowed to optimize partitioning
+    drmARaw.par(auto = true)
+
     // Apply selective downsampling, pin resulting matrix
     val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index d7f2787..f87624b 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -59,7 +59,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver {
   override def main(args: Array[String]): Unit = {
 
     parser = new MahoutSparkOptionParser(programName = "spark-itemsimilarity") 
{
-      head("spark-itemsimilarity", "Mahout 0.10.0")
+      head("spark-itemsimilarity", "Mahout")
 
       //Input output options, non-driver specific
       parseIOOptions(numInputs = 2)

http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index cfa8f99..817c6ff 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -54,7 +54,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
   override def main(args: Array[String]): Unit = {
 
     parser = new MahoutSparkOptionParser(programName = "spark-rowsimilarity") {
-      head("spark-rowsimilarity", "Mahout 0.10.0")
+      head("spark-rowsimilarity", "Mahout")
 
       //Input output options, non-driver specific
       parseIOOptions()
@@ -67,14 +67,14 @@ object RowSimilarityDriver extends MahoutSparkDriver {
         options + ("maxObservations" -> x)
       } text ("Max number of observations to consider per row (optional). 
Default: " +
         RowSimilarityOptions("maxObservations")) validate { x =>
-          if (x > 0) success else failure("Option --maxObservations must be > 
0")
+        if (x > 0) success else failure("Option --maxObservations must be > 0")
       }
 
       opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) =>
         options + ("maxSimilaritiesPerRow" -> x)
       } text ("Limit the number of similarities per item to this number 
(optional). Default: " +
         RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x =>
-          if (x > 0) success else failure("Option --maxSimilaritiesPerRow must 
be > 0")
+        if (x > 0) success else failure("Option --maxSimilaritiesPerRow must 
be > 0")
       }
 
       // --threshold not implemented in SimilarityAnalysis.rowSimilarity
@@ -85,7 +85,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a 
similarity measure.")
 
       //Drm output schema--not driver specific, drm specific
-      parseIndexedDatasetFormatOptions()
+      parseIndexedDatasetFormatOptions("\nInput and Output text file schema 
options (same for both):")
 
       //How to search for input
       parseFileDiscoveryOptions()
@@ -126,7 +126,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       null.asInstanceOf[IndexedDataset]
     } else {
 
-      val datasetA = indexedDatasetDFSRead(inFiles, readWriteSchema)
+      val datasetA = indexedDatasetDFSRead(src = inFiles, schema = 
readWriteSchema)
       datasetA
     }
   }
@@ -141,7 +141,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int],
       parser.opts("maxObservations").asInstanceOf[Int])
 
-    rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], 
readWriteSchema)
+    rowSimilarityIDS.dfsWrite(dest = 
parser.opts("output").asInstanceOf[String], schema = readWriteSchema)
 
     stop()
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index e2a2a9a..b5f76e0 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -125,6 +125,8 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
    * Read in text delimited rows from all URIs in this comma delimited source 
String and return
    * the DRM of all elements updating the dictionaries for row and column 
dictionaries. If there is
    * no strength value in the element, assume it's presence means a strength 
of 1.
+   * Note: if the input file has a strength delimiter but none is seen in 
rows, we assume there is none
+   *   and give the strength as 1 in the input DRM.
    * @param mc context for the Spark job
    * @param readSchema describes the delimiters and positions of values in the 
text delimited file.
    * @param source comma delimited URIs of text files to be read into the 
[[IndexedDatasetSpark]]
@@ -140,10 +142,10 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = 
readSchema("columnIdStrengthDelim").asInstanceOf[String]
       val elementDelim = readSchema("elementDelim").asInstanceOf[String]
+      val omitScore = readSchema("omitScore").asInstanceOf[Boolean]
 
       require (!source.isEmpty, "No file(s) to read")
-
-      var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) }
+      val rows = mc.textFile(source).map { line => line.split(rowKeyDelim) }
 
       // get row and column IDs
       val interactions = rows.map { row =>
@@ -151,8 +153,6 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       }
 
       interactions.cache()
-      // forces into memory so only for debugging
-      //interactions.collect()
 
       // create separate collections of rowID and columnID tokens
       val rowIDs = interactions.map { case (rowID, _) => rowID 
}.distinct().collect()
@@ -160,7 +160,10 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
       // the columns are in a TD string so separate them and get unique ones
       val columnIDs = interactions.flatMap { case (_, columns) => columns
         val elements = columns.split(elementDelim)
-        val colIDs = elements.map( elem => 
elem.split(columnIdStrengthDelim)(0) )
+        val colIDs = if (!omitScore)
+          elements.map( elem => elem.split(columnIdStrengthDelim)(0) )
+        else
+          elements
         colIDs
       }.distinct().collect()
 
@@ -186,13 +189,14 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
           val elements = columns.split(elementDelim)
           val row = new RandomAccessSparseVector(ncol)
           for (element <- elements) {
-            val id = element.split(columnIdStrengthDelim)(0)
+            val id = if (omitScore) element else 
element.split(columnIdStrengthDelim)(0)
             val columnID = columnIDDictionary_bcast.value.getOrElse(id, -1)
-            val pair = element.split(columnIdStrengthDelim)
-            if (pair.size == 2) // there was a strength
-              row.setQuick(columnID, pair(1).toDouble)
-            else // no strength so set DRM value to 1.0d, this ignores 
'omitScore', which is a write param
-              row.setQuick(columnID, 1.0d)
+            val strength = if (omitScore) 1.0d else {// if the input says not 
to omit but there is no seperator treat
+              // as omitting and return a strength of 1
+              if (element.split(columnIdStrengthDelim).size == 1) 1.0d
+              else element.split(columnIdStrengthDelim)(1).toDouble
+            }
+            row.setQuick(columnID, strength)
           }
           rowIndex -> row
         }

http://git-wip-us.apache.org/repos/asf/mahout/blob/c943c37e/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala 
b/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
index 562e8c6..eccddb1 100644
--- 
a/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/drivers/RowSimilarityDriverSuite.scala
@@ -74,11 +74,12 @@ class RowSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite  {
       "--master", masterUrl))
 
     val simLines = mahoutCtx.textFile(outPath).collect
-    for (rowNum <- 0 to 4){
-      simLines(rowNum).split("[\t ]") should contain theSameElementsAs 
firstFiveSimDocsTokens
-    }
-    for (rowNum <- 5 to 9){
-      simLines(rowNum).split("[\t ]") should contain theSameElementsAs 
lastFiveSimDocsTokens
+    simLines.foreach { line =>
+      val lineTokens = line.split("[\t ]")
+      if (lineTokens.contains("doc1") ) // docs are two flavors so if only 4 
similarities it will effectively classify
+        lineTokens should contain theSameElementsAs firstFiveSimDocsTokens
+      else
+        lineTokens should contain theSameElementsAs lastFiveSimDocsTokens
     }
 
   }

Reply via email to