Repository: mahout
Updated Branches:
  refs/heads/master 24cb5576f -> 9bfb76732


MAHOUT-1541, MAHOUT-1568, added option to ItemSimilarityDriver to allow output 
that is directly search engine indexable, also some default schema's for input 
and output of TDF tuples and DRMs


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9bfb7673
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9bfb7673
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9bfb7673

Branch: refs/heads/master
Commit: 9bfb767323833586873272af4db446f68f357f1f
Parents: 24cb557
Author: pferrel <[email protected]>
Authored: Mon Jul 7 08:42:49 2014 -0700
Committer: pferrel <[email protected]>
Committed: Mon Jul 7 08:42:49 2014 -0700

----------------------------------------------------------------------
 .../mahout/drivers/ItemSimilarityDriver.scala   |  9 ++-
 .../org/apache/mahout/drivers/Schema.scala      | 56 ++++++++++++++++++
 .../drivers/TextDelimitedReaderWriter.scala     |  7 ++-
 .../drivers/ItemSimilarityDriverSuite.scala     | 62 +++++++++++++++++---
 4 files changed, 125 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
index f78c590..71d36c9 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -136,7 +136,7 @@ object ItemSimilarityDriver extends MahoutDriver {
       note("\nUsing all defaults the input is expected of the form: 
\"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows 
will be used")
 
       //File finding strategy--not driver specific
-      note("\nFile input options:")
+      note("\nFile discovery options:")
       opt[Unit]('r', "recursive") action { (_, options) =>
         options.copy(recursive = true)
       } text ("Searched the -i path recursively for files that match 
--filenamePattern (optional), Default: false")
@@ -159,6 +159,11 @@ object ItemSimilarityDriver extends MahoutDriver {
         options.copy(tupleDelim = x)
       } text ("Separates vector tuple values in the values list (optional). 
Default: \",\"")
 
+      opt[Unit]("omitStrength") abbr ("os") action { (_, options) =>
+        options.copy(omitStrength = true)
+      } text ("Do not write the strength to the output files (optional), 
Default: false.")
+      note("This option is used to output indexable data for creating a search 
engine recommender.")
+
       //Spark config options--not driver specific
       note("\nSpark config options:")
       opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) =>
@@ -229,6 +234,7 @@ object ItemSimilarityDriver extends MahoutDriver {
     writeSchema = new Schema(
         "rowKeyDelim" -> options.rowKeyDelim,
         "columnIdStrengthDelim" -> options.columnIdStrengthDelim,
+        "omitScore" -> options.omitStrength,
         "tupleDelim" -> options.tupleDelim)
 
     writer = new TextDelimitedIndexedDatasetWriter(writeSchema)
@@ -315,6 +321,7 @@ object ItemSimilarityDriver extends MahoutDriver {
       rowKeyDelim: String = "\t",
       columnIdStrengthDelim: String = ":",
       tupleDelim: String = ",",
+      omitStrength: Boolean = false,
       dontAddMahoutJars: Boolean = false)
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
index 46e1540..7735b83 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -27,4 +27,60 @@ import scala.collection.mutable.HashMap
 class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
   // note: this require a mutable HashMap, do we care?
   this ++= params
+  if (!this.contains("omitScore")) this += ("omitScore" -> false)
 }
+
+// These can be used to keep the text in and out fairly standard to Mahout, 
where an application specific format is not
+// required.
+
+/** Simple default Schema for typical text delimited tuple file input
+  * This tells the reader to input tuples of the default (rowID<comma, tab, or 
space>columnID<comma, tab, or space>etc...)
+  */
+class DefaultTupleReadSchema extends Schema(
+    "delim" -> "[,\t ]", //comma, tab or space
+    "filter" -> "",
+    "rowIDPosition" -> 0,
+    "columnIDPosition" -> 1,
+    "filterPosition" -> -1)
+
+/** Simple default Schema for typical text delimited drm file output
+  * This tells the writer to write a DRM of the default
+  * (rowID<tab>columnID1:score1,columnID2:score2,...)
+  */
+class DefaultDRMWriteSchema extends Schema(
+    "rowKeyDelim" -> "\t",
+    "columnIdStrengthDelim" -> ":",
+    "tupleDelim" -> ",")
+
+/** Simple default Schema for typical text delimited drm file output
+  * This tells the reader to input tuples of the default (rowID<comma, tab, or 
space>columnID<comma, tab, or space>etc...)
+  */
+class DefaultDRMReadSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "tupleDelim" -> ",")
+
+/** Simple default Schema for reading a text delimited drm file  where the 
score of any tuple is ignored,
+  * all non-zeros are replaced with 1.
+  * This tells the reader to input DRM lines of the form
+  * (rowID<tab>columnID1:score1,columnID2:score2,...) remember the score is 
ignored. Alternatively the format can be
+  * (rowID<tab>columnID1,columnID2,...) where presence indicates a score of 1. 
This is the default output format for
+  * [[org.apache.mahout.drivers.DRMWriteBooleanSchema]]
+  */
+class DRMReadBooleanSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "tupleDelim" -> ",",
+  "omitScore" -> true)
+
+/** Simple default Schema for typical text delimited drm file write where the 
score of a tuple is omitted.
+  * The presence of a tuple means the score = 1, the absence means a score of 
0.
+  * This tells the reader to input DRM lines of the form
+  * (rowID<tab>columnID1,columnID2,...)
+  */
+class DRMWriteBooleanSchema extends Schema(
+  "rowKeyDelim" -> "\t",
+  "columnIdStrengthDelim" -> ":",
+  "tupleDelim" -> ",",
+  "omitScore" -> true)
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 119f8d3..ae78d59 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -136,10 +136,13 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDataset]{
       val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String]
       val columnIdStrengthDelim = 
writeSchema("columnIdStrengthDelim").asInstanceOf[String]
       val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String]
+      val omitScore = writeSchema("omitScore").asInstanceOf[Boolean]
       //instance vars must be put into locally scoped vals when put into 
closures that are
       //executed but Spark
+
       assert (indexedDataset != null, {println(this.getClass.toString+": has 
no indexedDataset to write"); throw new IllegalArgumentException })
       assert (!dest.isEmpty, {println(this.getClass.toString+": has no 
destination or indextedDataset to write"); throw new IllegalArgumentException})
+
       val matrix = indexedDataset.matrix
       val rowIDDictionary = indexedDataset.rowIDs
       val columnIDDictionary = indexedDataset.columnIDs
@@ -152,7 +155,9 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
 
         // for the rest of the row, construct the vector contents of tuples 
(external column ID, strength value)
         for (item <- itemVector.nonZeroes()) {
-          line += columnIDDictionary.inverse.get(item.index) + 
columnIdStrengthDelim + item.get + tupleDelim
+          line += columnIDDictionary.inverse.get(item.index)
+          if (!omitScore) line += columnIdStrengthDelim + item.get
+          line += tupleDelim
         }
         // drop the last delimiter, not needed to end the line
         line.dropRight(1)

http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 2827317..2db830c 100644
--- 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -261,13 +261,13 @@ class ItemSimilarityDriverSuite extends FunSuite with 
MahoutSuite with MahoutLoc
     val OutPath = TmpDir + "indicator-matrices"
 
     val lines = Array(
-        "0,0,1",
-        "0,1,1",
-        "1,2,1",
-        "1,3,1",
-        "2,4,1",
-        "3,0,1",
-        "3,3,1")
+      "0,0,1",
+      "0,1,1",
+      "1,2,1",
+      "1,3,1",
+      "2,4,1",
+      "3,0,1",
+      "3,3,1")
 
     val Answer = Set(
       "0\t1:1.7260924347106847",
@@ -299,6 +299,54 @@ class ItemSimilarityDriverSuite extends FunSuite with 
MahoutSuite with MahoutLoc
 
   }
 
+  test ("ItemSimilarityDriver write search engine output"){
+
+    val InDir = TmpDir + "in-dir/"
+    val InFilename = "in-file.tsv"
+    val InPath = InDir + InFilename
+
+    val OutPath = TmpDir + "indicator-matrices"
+
+    val lines = Array(
+      "0,0,1",
+      "0,1,1",
+      "1,2,1",
+      "1,3,1",
+      "2,4,1",
+      "3,0,1",
+      "3,3,1")
+
+    val Answer = Set(
+      "0\t1",
+      "3\t2",
+      "1\t0",
+      "4",
+      "2\t3")
+
+    // this creates one part-0000 file in the directory
+    mahoutCtx.parallelize(lines).coalesce(1, 
shuffle=true).saveAsTextFile(InDir)
+
+    // to change from using part files to a single .tsv file we'll need to use 
HDFS
+    val fs = FileSystem.get(new Configuration())
+    //rename part-00000 to something.tsv
+    fs.rename(new Path(InDir + "part-00000"), new Path(InPath))
+
+    afterEach // clean up before running the driver, it should handle the 
Spark conf and context
+
+    // local multi-threaded Spark with default HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", InPath,
+      "--output", OutPath,
+      "--master", masterUrl,
+      "--dontAddMahoutJars",
+      "--omitStrength"))
+
+    beforeEach // restart the test context to read the output of the driver
+    val indicatorLines = 
mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String]
+    assert(indicatorLines == Answer)
+
+  }
+
   test("ItemSimilarityDriver recursive file discovery using filename 
patterns"){
     //directory structure using the following
     // tmp/data/m1.tsv

Reply via email to