Repository: mahout Updated Branches: refs/heads/master 24cb5576f -> 9bfb76732
MAHOUT-1541, MAHOUT-1568, added option to ItemSimilarityDriver to allow output that is directly search engine indexable, also some default schema's for input and output of TDF tuples and DRMs Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9bfb7673 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9bfb7673 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9bfb7673 Branch: refs/heads/master Commit: 9bfb767323833586873272af4db446f68f357f1f Parents: 24cb557 Author: pferrel <[email protected]> Authored: Mon Jul 7 08:42:49 2014 -0700 Committer: pferrel <[email protected]> Committed: Mon Jul 7 08:42:49 2014 -0700 ---------------------------------------------------------------------- .../mahout/drivers/ItemSimilarityDriver.scala | 9 ++- .../org/apache/mahout/drivers/Schema.scala | 56 ++++++++++++++++++ .../drivers/TextDelimitedReaderWriter.scala | 7 ++- .../drivers/ItemSimilarityDriverSuite.scala | 62 +++++++++++++++++--- 4 files changed, 125 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index f78c590..71d36c9 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -136,7 +136,7 @@ object ItemSimilarityDriver extends MahoutDriver { note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used") //File finding strategy--not driver specific - note("\nFile input options:") + note("\nFile discovery options:") opt[Unit]('r', "recursive") action { (_, options) => options.copy(recursive = true) } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") @@ -159,6 +159,11 @@ object ItemSimilarityDriver extends MahoutDriver { options.copy(tupleDelim = x) } text ("Separates vector tuple values in the values list (optional). Default: \",\"") + opt[Unit]("omitStrength") abbr ("os") action { (_, options) => + options.copy(omitStrength = true) + } text ("Do not write the strength to the output files (optional), Default: false.") + note("This option is used to output indexable data for creating a search engine recommender.") + //Spark config options--not driver specific note("\nSpark config options:") opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) => @@ -229,6 +234,7 @@ object ItemSimilarityDriver extends MahoutDriver { writeSchema = new Schema( "rowKeyDelim" -> options.rowKeyDelim, "columnIdStrengthDelim" -> options.columnIdStrengthDelim, + "omitScore" -> options.omitStrength, "tupleDelim" -> options.tupleDelim) writer = new TextDelimitedIndexedDatasetWriter(writeSchema) @@ -315,6 +321,7 @@ object ItemSimilarityDriver extends MahoutDriver { rowKeyDelim: String = "\t", columnIdStrengthDelim: String = ":", tupleDelim: String = ",", + omitStrength: Boolean = false, dontAddMahoutJars: Boolean = false) } http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala index 46e1540..7735b83 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala @@ -27,4 +27,60 @@ import scala.collection.mutable.HashMap class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { // note: this require a mutable HashMap, do we care? this ++= params + if (!this.contains("omitScore")) this += ("omitScore" -> false) } + +// These can be used to keep the text in and out fairly standard to Mahout, where an application specific format is not +// required. + +/** Simple default Schema for typical text delimited tuple file input + * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...) + */ +class DefaultTupleReadSchema extends Schema( + "delim" -> "[,\t ]", //comma, tab or space + "filter" -> "", + "rowIDPosition" -> 0, + "columnIDPosition" -> 1, + "filterPosition" -> -1) + +/** Simple default Schema for typical text delimited drm file output + * This tells the writer to write a DRM of the default + * (rowID<tab>columnID1:score1,columnID2:score2,...) + */ +class DefaultDRMWriteSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "tupleDelim" -> ",") + +/** Simple default Schema for typical text delimited drm file output + * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...) + */ +class DefaultDRMReadSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "tupleDelim" -> ",") + +/** Simple default Schema for reading a text delimited drm file where the score of any tuple is ignored, + * all non-zeros are replaced with 1. + * This tells the reader to input DRM lines of the form + * (rowID<tab>columnID1:score1,columnID2:score2,...) remember the score is ignored. Alternatively the format can be + * (rowID<tab>columnID1,columnID2,...) where presence indicates a score of 1. This is the default output format for + * [[org.apache.mahout.drivers.DRMWriteBooleanSchema]] + */ +class DRMReadBooleanSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "tupleDelim" -> ",", + "omitScore" -> true) + +/** Simple default Schema for typical text delimited drm file write where the score of a tuple is omitted. + * The presence of a tuple means the score = 1, the absence means a score of 0. + * This tells the reader to input DRM lines of the form + * (rowID<tab>columnID1,columnID2,...) + */ +class DRMWriteBooleanSchema extends Schema( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "tupleDelim" -> ",", + "omitScore" -> true) + http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index 119f8d3..ae78d59 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -136,10 +136,13 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String] val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String] + val omitScore = writeSchema("omitScore").asInstanceOf[Boolean] //instance vars must be put into locally scoped vals when put into closures that are //executed but Spark + assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException }) assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException}) + val matrix = indexedDataset.matrix val rowIDDictionary = indexedDataset.rowIDs val columnIDDictionary = indexedDataset.columnIDs @@ -152,7 +155,9 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ // for the rest of the row, construct the vector contents of tuples (external column ID, strength value) for (item <- itemVector.nonZeroes()) { - line += columnIDDictionary.inverse.get(item.index) + columnIdStrengthDelim + item.get + tupleDelim + line += columnIDDictionary.inverse.get(item.index) + if (!omitScore) line += columnIdStrengthDelim + item.get + line += tupleDelim } // drop the last delimiter, not needed to end the line line.dropRight(1) http://git-wip-us.apache.org/repos/asf/mahout/blob/9bfb7673/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala index 2827317..2db830c 100644 --- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala @@ -261,13 +261,13 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with MahoutLoc val OutPath = TmpDir + "indicator-matrices" val lines = Array( - "0,0,1", - "0,1,1", - "1,2,1", - "1,3,1", - "2,4,1", - "3,0,1", - "3,3,1") + "0,0,1", + "0,1,1", + "1,2,1", + "1,3,1", + "2,4,1", + "3,0,1", + "3,3,1") val Answer = Set( "0\t1:1.7260924347106847", @@ -299,6 +299,54 @@ class ItemSimilarityDriverSuite extends FunSuite with MahoutSuite with MahoutLoc } + test ("ItemSimilarityDriver write search engine output"){ + + val InDir = TmpDir + "in-dir/" + val InFilename = "in-file.tsv" + val InPath = InDir + InFilename + + val OutPath = TmpDir + "indicator-matrices" + + val lines = Array( + "0,0,1", + "0,1,1", + "1,2,1", + "1,3,1", + "2,4,1", + "3,0,1", + "3,3,1") + + val Answer = Set( + "0\t1", + "3\t2", + "1\t0", + "4", + "2\t3") + + // this creates one part-0000 file in the directory + mahoutCtx.parallelize(lines).coalesce(1, shuffle=true).saveAsTextFile(InDir) + + // to change from using part files to a single .tsv file we'll need to use HDFS + val fs = FileSystem.get(new Configuration()) + //rename part-00000 to something.tsv + fs.rename(new Path(InDir + "part-00000"), new Path(InPath)) + + afterEach // clean up before running the driver, it should handle the Spark conf and context + + // local multi-threaded Spark with default HDFS + ItemSimilarityDriver.main(Array( + "--input", InPath, + "--output", OutPath, + "--master", masterUrl, + "--dontAddMahoutJars", + "--omitStrength")) + + beforeEach // restart the test context to read the output of the driver + val indicatorLines = mahoutCtx.textFile(OutPath+"/indicator-matrix/").collect.toSet[String] + assert(indicatorLines == Answer) + + } + test("ItemSimilarityDriver recursive file discovery using filename patterns"){ //directory structure using the following // tmp/data/m1.tsv
