removed o.a.m.Pair, cleaned up comments and style issues, simplified driver API, merged most of 1.2.1 changes but left on Spark v1.1.1 because of a bug in v1.2.1
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/15ee1951 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/15ee1951 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/15ee1951 Branch: refs/heads/master Commit: 15ee1951d1ba8d8ee7d24e2510af187fd984c8cf Parents: 43bea68 Author: pferrel <[email protected]> Authored: Mon Mar 2 13:38:05 2015 -0800 Committer: pferrel <[email protected]> Committed: Mon Mar 2 13:38:05 2015 -0800 ---------------------------------------------------------------------- examples/bin/get-all-examples.sh | 4 +- math-scala/pom.xml | 25 +++- .../apache/mahout/drivers/MahoutDriver.scala | 11 +- .../mahout/drivers/MahoutOptionParser.scala | 78 ++++++----- .../mahout/math/cf/SimilarityAnalysis.scala | 104 +++++++-------- .../org/apache/mahout/math/drm/package.scala | 4 +- .../math/indexeddataset/IndexedDataset.scala | 23 ++-- .../math/indexeddataset/ReaderWriter.scala | 71 ++++++++-- .../mahout/math/indexeddataset/Schema.scala | 82 ++++++------ spark/pom.xml | 6 +- spark/src/main/assembly/dependency-reduced.xml | 44 +++++++ spark/src/main/assembly/job.xml | 61 --------- .../apache/mahout/common/HDFSPathSearch.scala | 17 ++- .../mahout/drivers/ItemSimilarityDriver.scala | 78 ++++++----- .../mahout/drivers/MahoutSparkDriver.scala | 106 ++++++++------- .../drivers/MahoutSparkOptionParser.scala | 16 ++- .../mahout/drivers/RowSimilarityDriver.scala | 43 +++--- .../apache/mahout/drivers/TestNBDriver.scala | 58 ++++---- .../drivers/TextDelimitedReaderWriter.scala | 132 ++++++++++--------- .../apache/mahout/drivers/TrainNBDriver.scala | 16 +-- .../mahout/sparkbindings/SparkEngine.scala | 10 +- .../indexeddataset/IndexedDatasetSpark.scala | 17 ++- .../apache/mahout/sparkbindings/package.scala | 6 +- .../drivers/ItemSimilarityDriverSuite.scala | 117 ++++++++-------- .../test/DistributedSparkSuite.scala | 4 + 25 files changed, 619 insertions(+), 514 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/examples/bin/get-all-examples.sh ---------------------------------------------------------------------- diff --git a/examples/bin/get-all-examples.sh b/examples/bin/get-all-examples.sh index a24c7fd..4128e47 100755 --- a/examples/bin/get-all-examples.sh +++ b/examples/bin/get-all-examples.sh @@ -26,8 +26,8 @@ echo " Solr-recommender example: " echo " 1) imports text 'log files' of some delimited form for user preferences" echo " 2) creates the correct Mahout files and stores distionaries to translate external Id to and from Mahout Ids" echo " 3) it implements a prototype two actions 'cross-recommender', which takes two actions made by the same user and creates recommendations" -echo " 4) it creates output for user->preference history CSV and and item->similar items 'indicator' matrix for use in a Solr-recommender." -echo " To use Solr you would index the indicator matrix CSV, and use user preference history from the history CSV as a query, the result" +echo " 4) it creates output for user->preference history CSV and and item->similar items 'similarity' matrix for use in a Solr-recommender." +echo " To use Solr you would index the similarity matrix CSV, and use user preference history from the history CSV as a query, the result" echo " from Solr will be an ordered list of recommendations returning the same item Ids as were input." echo " For further description see the README.md here https://github.com/pferrel/solr-recommender" echo " To build run 'cd solr-recommender; mvn install'" http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/pom.xml ---------------------------------------------------------------------- diff --git a/math-scala/pom.xml b/math-scala/pom.xml index 66309d6..50cea7a 100644 --- a/math-scala/pom.xml +++ b/math-scala/pom.xml @@ -175,17 +175,36 @@ <dependency> <groupId>com.github.scopt</groupId> - <artifactId>scopt_2.10</artifactId> - <version>3.2.0</version> + <artifactId>scopt_${scala.major}</artifactId> + <version>3.3.0</version> </dependency> <!-- scala stuff --> <dependency> <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> - + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-actors</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + <version>${scala.version}</version> + </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.major}</artifactId> http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala index 3d9d4e1..32515f1 100644 --- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -19,23 +19,24 @@ package org.apache.mahout.drivers import org.apache.mahout.math.drm.DistributedContext -/** Extended by a platform specific version of this class to create a Mahout CLI driver. - */ +/** Extended by a platform specific version of this class to create a Mahout CLI driver. */ abstract class MahoutDriver { - implicit protected var mc: DistributedContext = _ implicit protected var parser: MahoutOptionParser = _ var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite + /** must be overriden to setup the DistributedContext mc*/ + protected def start() : Unit + /** Override (optionally) for special cleanup */ - protected def stop: Unit = { + protected def stop(): Unit = { if (!_useExistingContext) mc.close } /** This is where you do the work, call start first, then before exiting call stop */ - protected def process: Unit + protected def process(): Unit /** Parse command line and call process */ def main(args: Array[String]): Unit http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala index 479a8d0..3b5affd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -20,16 +20,17 @@ import scopt.OptionParser import scala.collection.immutable -/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to - * keep both standarized. - * @param programName Name displayed in help message, the name by which the driver is invoked. - * @note options are engine neutral by convention. See the engine specific extending class for - * to add Spark or other engine options. - * */ +/** + * Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to + * keep both standarized. + * @param programName Name displayed in help message, the name by which the driver is invoked. + * @note options are engine neutral by convention. See the engine specific extending class for + * to add Spark or other engine options. + */ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { // build options from some stardard CLI param groups - // Note: always put the driver specific options at the last so they can override and previous options! + // Note: always put the driver specific options at the last so they can override any previous options! var opts = Map.empty[String, Any] override def showUsageOnError = true @@ -39,12 +40,14 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A note("Input, output options") opt[String]('i', "input") required() action { (x, options) => options + ("input" -> x) - } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)") + } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs" + + " (required)") if (numInputs == 2) { opt[String]("input2") abbr ("i2") action { (x, options) => options + ("input2" -> x) - } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.") + } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" " + + "(optional). Default: empty.") } opt[String]('o', "output") required() action { (x, options) => @@ -53,11 +56,11 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } else { options + ("output" -> (x + "/")) } - } text ("Path for output, any local or HDFS supported URI (required)") + } text ("Path for output directory, any HDFS supported URI (required)") } - def parseGenericOptions = { + def parseGenericOptions() = { opts = opts ++ MahoutOptionParser.GenericOptions opt[Int]("randomSeed") abbr ("rs") action { (x, options) => options + ("randomSeed" -> x) @@ -65,48 +68,55 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A if (x > 0) success else failure("Option --randomSeed must be > 0") } - //output both input DRMs + //output both input IndexedDatasets opt[Unit]("writeAllDatasets") hidden() action { (_, options) => options + ("writeAllDatasets" -> true) }//Hidden option, though a user might want this. } - def parseElementInputSchemaOptions{ + def parseElementInputSchemaOptions() = { //Input text file schema--not driver specific but input data specific, elements input, - // not drms + // not rows of IndexedDatasets opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions note("\nInput text file schema options:") - opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { (x, options) => - options + ("inDelim" -> x) + opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[ ,\\t]\"") action { + (x, options) => + options + ("inDelim" -> x) } opt[String]("filter1") abbr ("f1") action { (x, options) => options + ("filter1" -> x) - } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used") + } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). " + + "Default: no filter, all data is used") opt[String]("filter2") abbr ("f2") action { (x, options) => options + ("filter2" -> x) - } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected") + } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). " + + "If not present no secondary dataset is collected") opt[Int]("rowIDColumn") abbr ("rc") action { (x, options) => options + ("rowIDColumn" -> x) - } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => - if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") + } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { + x => + if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") } opt[Int]("itemIDColumn") abbr ("ic") action { (x, options) => options + ("itemIDColumn" -> x) - } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => - if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") + } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { + x => + if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") } opt[Int]("filterColumn") abbr ("fc") action { (x, options) => options + ("filterColumn" -> x) - } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => + } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no " + + "filter") validate { x => if (x >= -1) success else failure("Option --filterColNum must be >= -1") } - note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used") + note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or" + + " \"userID<tab>itemID<tab>any-text...\" and all rows will be used") //check for column consistency checkConfig { options: Map[String, Any] => @@ -126,7 +136,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } - def parseFileDiscoveryOptions = { + def parseFileDiscoveryOptions() = { //File finding strategy--not driver specific opts = opts ++ MahoutOptionParser.FileDiscoveryOptions note("\nFile discovery options:") @@ -136,12 +146,13 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A opt[String]("filenamePattern") abbr ("fp") action { (x, options) => options + ("filenamePattern" -> x) - } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory") + } text ("Regex to match in determining input files (optional). Default: filename in the --input option " + + "or \"^part-.*\" if --input is a directory") } - def parseDrmFormatOptions = { - opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions + def parseIndexedDatasetFormatOptions() = { + opts = opts ++ MahoutOptionParser.TextDelimitedIndexedDatasetOptions note("\nOutput text file schema options:") opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => options + ("rowKeyDelim" -> x) @@ -160,13 +171,16 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } text ("Do not write the strength to the output files (optional), Default: false.") note("This option is used to output indexable data for creating a search engine recommender.") - note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") + note("\nDefault delimiters will produce output of the form: " + + "\"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") } } -/** Companion object defines default option groups for reference in any driver that needs them. - * @note not all options are platform neutral so other platforms can add default options here if desired */ +/** + * Companion object defines default option groups for reference in any driver that needs them. + * @note not all options are platform neutral so other platforms can add default options here if desired + */ object MahoutOptionParser { // set up the various default option groups @@ -196,7 +210,7 @@ object MahoutOptionParser { "filter2" -> null.asInstanceOf[String], "inDelim" -> "[,\t ]") - final val TextDelimitedDRMOptions = immutable.HashMap[String, Any]( + final val TextDelimitedIndexedDatasetOptions = immutable.HashMap[String, Any]( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", "elementDelim" -> " ", http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala index e1766e8..6557ab0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala @@ -32,7 +32,7 @@ import scala.util.Random /** - * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", * available at http://www.mapr.com/practical-machine-learning * * see also "Sebastian Schelter, Christoph Boden, Volker Markl: @@ -44,14 +44,16 @@ object SimilarityAnalysis extends Serializable { /** Compares (Int,Double) pairs by the second value */ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... - * and returns a list of indicator and cross-indicator matrices - * @param drmARaw Primary interaction matrix - * @param randomSeed when kept to a constant will make repeatable downsampling - * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 - * @param maxNumInteractions max number of interactions after downsampling, default: 500 - * @return - */ + /** + * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... + * and returns a list of similarity and cross-similarity matrices + * @param drmARaw Primary interaction matrix + * @param randomSeed when kept to a constant will make repeatable downsampling + * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 + * @param maxNumInteractions max number of interactions after downsampling, default: 500 + * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and + * cross-cooccurrence + */ def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { @@ -69,11 +71,11 @@ object SimilarityAnalysis extends Serializable { // Compute co-occurrence matrix A'A val drmAtA = drmA.t %*% drmA - // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix - val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, - bcastInteractionsPerItemA, crossCooccurrence = false) + // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix + val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) - var indicatorMatrices = List(drmIndicatorsAtA) + var similarityMatrices = List(drmSimilarityAtA) // Now look at cross-co-occurrences for (drmBRaw <- drmBs) { @@ -86,10 +88,10 @@ object SimilarityAnalysis extends Serializable { // Compute cross-co-occurrence matrix A'B val drmAtB = drmA.t %*% drmB - val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, + val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, bcastInteractionsPerThingB) - indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB + similarityMatrices = similarityMatrices :+ drmSimilarityAtB drmB.uncache() } @@ -97,19 +99,21 @@ object SimilarityAnalysis extends Serializable { // Unpin downsampled interaction matrix drmA.uncache() - // Return list of indicator matrices - indicatorMatrices + // Return list of similarity matrices + similarityMatrices } - /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... - * and returns a list of indicator and cross-indicator matrices - * Somewhat easier to use method, which handles the ID dictionaries correctly - * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param maxInterestingItemsPerThing max similarities per items - * @param maxNumInteractions max number of input items per item - * @return - */ + /** + * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns + * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID + * dictionaries correctly + * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary + * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed + * @param maxInterestingItemsPerThing max similarities per items + * @param maxNumInteractions max number of input items per item + * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled + * IndexedDatasets for cooccurrence and cross-cooccurrence + */ def cooccurrencesIDSs(indexedDatasets: Array[IndexedDataset], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, @@ -127,13 +131,13 @@ object SimilarityAnalysis extends Serializable { retIDSs.toList } - /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows - * @param drmARaw Primary interaction matrix - * @param randomSeed when kept to a constant will make repeatable downsampling - * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 - * @param maxNumInteractions max number of interactions after downsampling, default: 500 - * @return - */ + /** + * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows + * @param drmARaw Primary interaction matrix + * @param randomSeed when kept to a constant will make repeatable downsampling + * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 + * @param maxNumInteractions max number of interactions after downsampling, default: 500 + */ def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50, maxNumInteractions: Int = 500): DrmLike[Int] = { @@ -152,20 +156,20 @@ object SimilarityAnalysis extends Serializable { val drmAAt = drmA %*% drmA.t // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix - val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA, - bcastInteractionsPerItemA, crossCooccurrence = false) + val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, + bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) drmSimilaritiesAAt } - /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows. - * Uses IndexedDatasets, which handle external ID dictionaries properly - * @param indexedDataset compare each row to every other - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param maxInterestingSimilaritiesPerRow max elements returned in each row - * @param maxObservationsPerRow max number of input elements to use - * @return - */ + /** + * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows. + * Uses IndexedDatasets, which handle external ID dictionaries properly + * @param indexedDataset compare each row to every other + * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed + * @param maxInterestingSimilaritiesPerRow max elements returned in each row + * @param maxObservationsPerRow max number of input elements to use + */ def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50, maxObservationsPerRow: Int = 500): @@ -175,10 +179,7 @@ object SimilarityAnalysis extends Serializable { indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs) } - /** - * Compute loglikelihood ratio - * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details - */ + /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */ def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, numInteractionsWithAandB: Long, numInteractions: Long) = { @@ -220,7 +221,7 @@ object SimilarityAnalysis extends Serializable { val candidate = thingA -> llr - // matches legacy hadoop code and maps values to range (0..1) + // legacy hadoop code maps values to range (0..1) via // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr)) // val candidate = thingA -> normailizedLLR @@ -253,7 +254,7 @@ object SimilarityAnalysis extends Serializable { * @param drmM matrix to downsample * @param seed random number generator seed, keep to a constant if repeatability is neccessary * @param maxNumInteractions number of elements in a row of the returned matrix - * @return + * @return the downsampled DRM */ def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { @@ -269,9 +270,8 @@ object SimilarityAnalysis extends Serializable { case (keys, block) => val numInteractions: Vector = bcastNumInteractions - // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures - // don't use commons since scala's is included anyway - // val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of + //failures val random = new Random(MurmurHash.hash(keys(0), seed)) val downsampledBlock = block.like() http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index 3afbecb..81f6ab1 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -123,13 +123,13 @@ package object indexeddataset { def indexedDatasetDFSRead(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, existingRowIDs: BiMap[String, Int] = HashBiMap.create()) - (implicit ctx: DistributedContext): + (implicit ctx: DistributedContext): IndexedDataset = ctx.indexedDatasetDFSRead(src, schema, existingRowIDs) def indexedDatasetDFSReadElements(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, existingRowIDs: BiMap[String, Int] = HashBiMap.create()) - (implicit ctx: DistributedContext): + (implicit ctx: DistributedContext): IndexedDataset = ctx.indexedDatasetDFSReadElements(src, schema, existingRowIDs) } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala index c7eb2cb..f6811e2 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/IndexedDataset.scala @@ -22,15 +22,12 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} import org.apache.mahout.math.indexeddataset /** - * Wraps a [[org.apache.mahout.math.drm.CheckpointedDrm]] object with two [[com.google.common.collect.BiMap]]s to store - * ID/label translation dictionaries. - * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so - * a user specified label or ID can be stored and mapped to and from the Mahout Int ID - * used internal to Mahout core code. - * - * @todo Often no need for both or perhaps either dictionary, so save resources by allowing - * to be not created when not needed. - */ + * Wrap an [[org.apache.mahout.math.drm.DrmLike]] with bidirectional ID mappings [[com.google.common.collect.BiMap]] + * so a user specified labels/IDs can be stored and mapped to and from the Mahout Int ID used internal to Mahout + * core code. + * @todo Often no need for both or perhaps either dictionary, so save resources by allowing to be not created + * when not needed. + */ trait IndexedDataset { val matrix: CheckpointedDrm[Int] @@ -39,12 +36,13 @@ trait IndexedDataset { /** * Write a text delimited file(s) with the row and column IDs from dictionaries. - * @param dest - * @param schema + * @param dest write location, usually a directory + * @param schema params to control writing + * @param sc the [[org.apache.mahout.math.drm.DistributedContext]] used to do a distributed write */ def dfsWrite(dest: String, schema: Schema)(implicit sc: DistributedContext): Unit - /** Factory method, creates the extending class */ + /** Factory method, creates the extending class and returns a new instance */ def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]): IndexedDataset @@ -52,6 +50,7 @@ trait IndexedDataset { * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value. * No changes are made to the underlying drm. * @param n number to use for new row cardinality, should be larger than current + * @return a new IndexedDataset or extending class with new cardinality * @note should be done before any optimizer actions are performed on the matrix or you'll get unpredictable * results. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala index cf429f5..f7653ae 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/ReaderWriter.scala @@ -20,49 +20,98 @@ package org.apache.mahout.math.indexeddataset import com.google.common.collect.{BiMap, HashBiMap} import org.apache.mahout.math.drm.DistributedContext -/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, - * which also defines the type to be read. - * @tparam T type of object to read. - */ +/** + * Reader trait is abstract in the sense that the elementReader and rowReader functions must be supplied by an + * extending trait, which also defines the type to be read. + * @tparam T type of object to read. + */ trait Reader[T]{ val mc: DistributedContext val readSchema: Schema + /** + * Override in extending trait to supply T and perform a parallel read of collection elements + * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from + * @param readSchema map of parameters controlling formating and how the read is executed + * @param source list of comma delimited files to read from + * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already + * been applied to this collection--used to synchronize row IDs between several + * collections + * @return a new collection of type T + */ protected def elementReader( mc: DistributedContext, readSchema: Schema, source: String, existingRowIDs: BiMap[String, Int]): T - protected def drmReader( + /** + * Override in extending trait to supply T and perform a parallel read of collection rows + * @param mc a [[org.apache.mahout.math.drm.DistributedContext]] to read from + * @param readSchema map of parameters controlling formating and how the read is executed + * @param source list of comma delimited files to read from + * @param existingRowIDs [[com.google.common.collect.BiMap]] containing row IDs that have already + * been applied to this collection--used to synchronize row IDs between several + * collections + * @return a new collection of type T + */ + protected def rowReader( mc: DistributedContext, readSchema: Schema, source: String, existingRowIDs: BiMap[String, Int]): T + /** + * Public method called to perform the element-wise read. Usually no need to override + * @param source comma delimited URIs to read from + * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used + * to synchronize all row ids is several collections + * @return a new collection of type T + */ def readElementsFrom( source: String, existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = elementReader(mc, readSchema, source, existingRowIDs) - def readDRMFrom( + /** + * Public method called to perform the row-wise read. Usually no need to override. + * @param source comma delimited URIs to read from + * @param existingRowIDs a [[com.google.common.collect.BiMap]] containing previously used id mappings--used + * to synchronize all row ids is several collections + * @return a new collection of type T + */ + def readRowsFrom( source: String, existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = - drmReader(mc, readSchema, source, existingRowIDs) + rowReader(mc, readSchema, source, existingRowIDs) } -/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, - * which also defines the type to be written. - * @tparam T type of object to write. - */ +/** + * Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, + * which also defines the type to be written. + * @tparam T type of object to write, usually a matrix type thing. + */ trait Writer[T]{ val mc: DistributedContext val sort: Boolean val writeSchema: Schema + /** + * Override to provide writer method + * @param mc context used to do distributed write + * @param writeSchema map with params to control format and execution of the write + * @param dest root directory to write to + * @param collection usually a matrix like collection to write + * @param sort flags whether to sort the rows by value descending + */ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit + /** + * Call this method to perform the write, usually no need to override. + * @param collection what to write + * @param dest root directory to write to + */ def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort) } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala index 557b419..3b4a2e9 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/indexeddataset/Schema.scala @@ -19,34 +19,33 @@ package org.apache.mahout.math.indexeddataset import scala.collection.mutable.HashMap -/** Syntactic sugar for mutable.HashMap[String, Any] - * - * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} - */ +/** + * Syntactic sugar for mutable.HashMap[String, Any] + * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} + */ class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { // note: this require a mutable HashMap, do we care? this ++= params - /** Constructor for copying an existing Schema - * - * @param schemaToClone return a copy of this Schema - */ + /** + * Constructor for copying an existing Schema + * @param schemaToClone return a copy of this Schema + */ def this(schemaToClone: Schema){ this() this ++= schemaToClone } } -/** These can be used to keep the text in and out fairly standard to Mahout, where an application specific - * format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] - * , which can be used to create a Mahout DRM for DSL ops. - */ - +// These can be used to keep the text in and out fairly standard to Mahout, where an application specific +// format is not required. These apply to formatting of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] +// which can be used to create a Mahout DRM for DSL ops. -/** Simple default Schema for typical text delimited element file input - * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID - * <comma, tab, or space>here may be other ignored text...) - */ +/** + * Simple default Schema for typical text delimited element file input + * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID + * <comma, tab, or space>here may be other ignored text...) + */ final object DefaultIndexedDatasetElementReadSchema extends Schema( "delim" -> "[,\t ]", //comma, tab or space "filter" -> "", @@ -54,46 +53,49 @@ final object DefaultIndexedDatasetElementReadSchema extends Schema( "columnIDPosition" -> 1, "filterColumn" -> -1) -/** Default Schema for text delimited drm file output - * This tells the writer to write a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] of the default form: - * (rowID<tab>columnID1:score1<space>columnID2:score2...) - */ +/** + * Default Schema for text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output with + * one row per line. + * The default form: + * (rowID<tab>columnID1:score1<space>columnID2:score2...) + */ final object DefaultIndexedDatasetWriteSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", "elementDelim" -> " ", "omitScore" -> false) -/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file input - * This tells the reader to input text lines of the form: - * (rowID<tab>columnID1:score1,columnID2:score2,...) - */ +/** + * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file + * row-wise input. This tells the reader to input text lines of the form: + * (rowID<tab>columnID1:score1,columnID2:score2,...) + */ final object DefaultIndexedDatasetReadSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", "elementDelim" -> " ") -/** Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where - * the score of any element is ignored, - * all non-zeros are replaced with 1. - * This tells the reader to input DRM lines of the form - * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored. - * Alternatively the format can be - * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default - * output format for [[IndexedDatasetWriteBooleanSchema]] - */ +/** + * Default Schema for reading a text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file where + * the score of any element is ignored. + * This tells the reader to input DRM lines of the form + * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored. + * Alternatively the format can be + * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default + * output format for [[IndexedDatasetWriteBooleanSchema]] + */ final object IndexedDatasetReadBooleanSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", "elementDelim" -> " ", "omitScore" -> true) -/** Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file write where - * the score of a element is omitted. - * The presence of a element means the score = 1, the absence means a score of 0. - * This tells the writer to output [[org.apache.mahout.math.indexeddataset.IndexedDataset]] lines of the form - * (rowID<tab>columnID1<space>columnID2...) - */ +/** + * Default Schema for typical text delimited [[org.apache.mahout.math.indexeddataset.IndexedDataset]] file output + * where the score of a element is omitted. This tells the writer to output + * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] row of the form + * (rowID<tab>columnID1<space>columnID2...) + */ final object IndexedDatasetWriteBooleanSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index bcf9e30..c7069b6 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -164,14 +164,14 @@ <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> - <id>job</id> + <id>dependency-reduced</id> <phase>package</phase> <goals> <goal>single</goal> </goals> <configuration> <descriptors> - <descriptor>../spark/src/main/assembly/job.xml</descriptor> + <descriptor>src/main/assembly/dependency-reduced.xml</descriptor> </descriptors> </configuration> </execution> @@ -317,11 +317,9 @@ <scope>test</scope> </dependency> - <!-- 3rd-party --> <!-- scala stuff --> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.major}</artifactId> http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/dependency-reduced.xml ---------------------------------------------------------------------- diff --git a/spark/src/main/assembly/dependency-reduced.xml b/spark/src/main/assembly/dependency-reduced.xml new file mode 100644 index 0000000..5dcc945 --- /dev/null +++ b/spark/src/main/assembly/dependency-reduced.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 + http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>dependency-reduced</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <unpack>true</unpack> + <unpackOptions> + <!-- MAHOUT-1126 --> + <excludes> + <exclude>META-INF/LICENSE</exclude> + </excludes> + </unpackOptions> + <scope>runtime</scope> + <outputDirectory>/</outputDirectory> + <useTransitiveFiltering>true</useTransitiveFiltering> + <includes> + <include>com.google.guava:guava</include> + <include>com.github.scopt</include> + </includes> + </dependencySet> + </dependencySets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/assembly/job.xml ---------------------------------------------------------------------- diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml deleted file mode 100644 index 2bdb3ce..0000000 --- a/spark/src/main/assembly/job.xml +++ /dev/null @@ -1,61 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly - xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 - http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>job</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <unpack>true</unpack> - <unpackOptions> - <!-- MAHOUT-1126 --> - <excludes> - <exclude>META-INF/LICENSE</exclude> - </excludes> - </unpackOptions> - <scope>runtime</scope> - <outputDirectory>/</outputDirectory> - <useTransitiveFiltering>true</useTransitiveFiltering> - <excludes> - <exclude>org.apache.hadoop:hadoop-core</exclude> - </excludes> - </dependencySet> - </dependencySets> - <fileSets> - <fileSet> - <directory>${basedir}/target/classes</directory> - <outputDirectory>/</outputDirectory> - <excludes> - <exclude>*.jar</exclude> - </excludes> - </fileSet> - <fileSet> - <directory>${basedir}/target/classes</directory> - <outputDirectory>/</outputDirectory> - <includes> - <include>driver.classes.default.props</include> - </includes> - </fileSet> - </fileSets> -</assembly> http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala index 42bf697..0b4130d 100644 --- a/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala +++ b/spark/src/main/scala/org/apache/mahout/common/HDFSPathSearch.scala @@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} /** * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters * in the constructor. - * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]] - * + * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]] * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs * @param filePattern regex that must match the entire filename to have the file returned * @param recursive true traverses the filesystem recursively, default = false @@ -35,8 +34,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: val conf = new Configuration() val fs = FileSystem.get(conf) - /** Returns a string of comma delimited URIs matching the filePattern - * When pattern matching dirs are never returned, only traversed. */ + /** + * Returns a string of comma delimited URIs matching the filePattern + * When pattern matching dirs are never returned, only traversed. + */ def uris: String = { if (!filePattern.isEmpty){ // have file pattern so val pathURIs = pathURI.split(",") @@ -51,8 +52,10 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: } } - /** Find matching files in the dir, recursively call self when another directory is found - * Only files are matched, directories are traversed but never return a match */ + /** + * Find matching files in the dir, recursively call self when another directory is found + * Only files are matched, directories are traversed but never return a match + */ private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = { val seed = fs.getFileStatus(new Path(dir)) var f: String = files @@ -71,7 +74,7 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: f = findFiles(fileStatus.getPath.toString, filePattern, f) } } - }else{ f = dir }// was a filename not dir + } else { f = dir }// was a filename not dir f } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 36ba6ef..63da80f 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -24,23 +24,19 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scala.collection.immutable.HashMap /** - * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]. - * Reads text lines - * that contain (row id, column id, ...). The IDs are user specified strings which will be - * preserved in the - * output. The individual elements will be accumulated into a matrix like [[org.apache.mahout.math.indexeddataset.IndexedDataset]] - * and [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]] - * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two - * matrices and calculate both the self similarity of the primary matrix and the row-wise - * similarity of the primary - * to the secondary. Returns one or two directories of text files formatted as specified in - * the options. - * The options allow flexible control of the input schema, file discovery, output schema, and control of - * algorithm parameters. - * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple - * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, - * you can specify only the input and output file and directory--all else will default to the correct values. - * Each output line will contain the Item ID and similar items sorted by LLR strength descending. + * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]]. Reads text lines + * that contain (row id, column id, ...). The IDs are user specified strings which will be preserved in the output. + * The individual elements will be accumulated into a matrix like + * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] and + * [[org.apache.mahout.math.cf.SimilarityAnalysis#cooccurrencesIDSs]] will be used to calculate row-wise + * self-similarity, or when using filters or two inputs, will generate two matrices and calculate both the + * self-similarity of the primary matrix and the row-wise similarity of the primary to the secondary. Returns one + * or two directories of text files formatted as specified in the options. The options allow flexible control of the + * input schema, file discovery, output schema, and control of algorithm parameters. To get help run + * {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple elements of text delimited + * values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, you can specify + * only the input and output file and directory--all else will default to the correct values. Each output line will + * contain the Item ID and similar items sorted by LLR strength descending. * @note To use with a Spark cluster see the --master option, if you run out of heap space check * the --sparkExecutorMemory option. Other [[org.apache.spark.SparkConf]] key value pairs can be with the -D:k=v * option. @@ -57,6 +53,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver { private var readSchema2: Schema = _ /** + * Entry point, not using Scala App trait * @param args Command line args, if empty a help message is printed. */ override def main(args: Array[String]): Unit = { @@ -74,52 +71,51 @@ object ItemSimilarityDriver extends MahoutSparkDriver { options + ("maxPrefs" -> x) } text ("Max number of preferences to consider per user (optional). Default: " + ItemSimilarityOptions("maxPrefs")) validate { x => - if (x > 0) success else failure("Option --maxPrefs must be > 0") + if (x > 0) success else failure("Option --maxPrefs must be > 0") } - /** not implemented in SimilarityAnalysis.cooccurrence - * threshold, and minPrefs - * todo: replacing the threshold with some % of the best values and/or a - * confidence measure expressed in standard deviations would be nice. - */ + // not implemented in SimilarityAnalysis.cooccurrence + // threshold, and minPrefs + // todo: replacing the threshold with some % of the best values and/or a + // confidence measure expressed in standard deviations would be nice. opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) => options + ("maxSimilaritiesPerItem" -> x) } text ("Limit the number of similarities per item to this number (optional). Default: " + ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x => - if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0") + if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0") } //Driver notes--driver specific note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.") //Input text format - parseElementInputSchemaOptions + parseElementInputSchemaOptions() //How to search for input - parseFileDiscoveryOptions + parseFileDiscoveryOptions() //Drm output schema--not driver specific, drm specific - parseDrmFormatOptions + parseIndexedDatasetFormatOptions() //Spark config options--not driver specific - parseSparkOptions + parseSparkOptions() //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI - parseGenericOptions + parseGenericOptions() help("help") abbr ("h") text ("prints this usage text\n") } parser.parse(args, parser.opts) map { opts => parser.opts = opts - process + process() } } override protected def start() : Unit = { - super.start + super.start() readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String], "filter" -> parser.opts("filter1").asInstanceOf[String], @@ -139,12 +135,12 @@ object ItemSimilarityDriver extends MahoutSparkDriver { "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String], "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean], "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String]) - } + } private def readIndexedDatasets: Array[IndexedDataset] = { - val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], - parser.opts("recursive").asInstanceOf[Boolean]).uris + val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], + parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) "" else HDFSPathSearch(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris @@ -160,8 +156,8 @@ object ItemSimilarityDriver extends MahoutSparkDriver { // The case of reading B can be a bit tricky when the exact same row IDs don't exist for A and B // Here we assume there is one row ID space for all interactions. To do this we calculate the - // row cardinality only after reading in A and B (or potentially C...) We then adjust the - // cardinality so all match, which is required for the math to work. + // row cardinality only after reading in A and B (or potentially C...) We then adjust the cardinality + // so all match, which is required for the math to work. // Note: this may leave blank rows with no representation in any DRM. Blank rows need to // be supported (and are at least on Spark) or the row cardinality adjustment will not work. val datasetB = if (!inFiles2.isEmpty) { @@ -201,19 +197,19 @@ object ItemSimilarityDriver extends MahoutSparkDriver { } } - override def process: Unit = { - start + override def process(): Unit = { + start() val indexedDatasets = readIndexedDatasets val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int], parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int]) // todo: allow more than one cross-similarity matrix? - idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "indicator-matrix", schema = writeSchema) + idss(0).dfsWrite(parser.opts("output").asInstanceOf[String] + "similarity-matrix", schema = writeSchema) if(idss.length > 1) - idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix", schema = writeSchema) + idss(1).dfsWrite(parser.opts("output").asInstanceOf[String] + "cross-similarity-matrix", schema = writeSchema) - stop + stop() } } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala index ab40c3a..668d70c 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -21,69 +21,81 @@ import org.apache.mahout.math.drm.DistributedContext import org.apache.spark.SparkConf import org.apache.mahout.sparkbindings._ -/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. - * Also define a Map of options for the command line parser. The following template may help: - * {{{ - * object SomeDriver extends MahoutDriver { - * - * // define only the options specific to this driver, inherit the generic ones - * private final val SomeOptions = HashMap[String, Any]( - * "maxThings" -> 500, - * "minThings" -> 100, - * "appName" -> "SomeDriver") - * - * override def main(args: Array[String]): Unit = { - * - * val parser = new MahoutOptionParser(programName = "shortname") { - * head("somedriver", "Mahout 1.0-SNAPSHOT") - * - * // Input output options, non-driver specific - * parseIOOptions - * - * // Algorithm specific options - * // Add in the new options - * opts = opts ++ SomeOptions - * note("\nAlgorithm control options:") - * opt[Int]("maxThings") abbr ("mt") action { (x, options) => - * options + ("maxThings" -> x) ... - * } - * parser.parse(args, parser.opts) map { opts => - * parser.opts = opts - * process - * } - * } - * - * override def process: Unit = { - * start // override to change the default Kryo or SparkConf before the distributed context is created - * // do the work here - * stop - * } - * - * }}} - */ +/** + * Extend this class to create a Mahout CLI driver. Minimally you must override process and main. + * Also define a Map of options for the command line parser. The following template may help: + * {{{ + * object SomeDriver extends MahoutDriver { + * + * // define only the options specific to this driver, inherit the generic ones + * private final val SomeOptions = HashMap[String, Any]( + * "maxThings" -> 500, + * "minThings" -> 100, + * "appName" -> "SomeDriver") + * + * override def main(args: Array[String]): Unit = { + * + * val parser = new MahoutOptionParser(programName = "shortname") { + * head("somedriver", "Mahout 1.0-SNAPSHOT") + * + * // Input output options, non-driver specific + * parseIOOptions() + * + * // Algorithm specific options + * // Add in the new options + * opts = opts ++ SomeOptions + * note("\nAlgorithm control options:") + * opt[Int]("maxThings") abbr ("mt") action { (x, options) => + * options + ("maxThings" -> x) ... + * } + * parser.parse(args, parser.opts) map { opts => + * parser.opts = opts + * process() + * } + * } + * + * override def process: Unit = { + * start() // override to change the default Kryo or SparkConf before the distributed context is created + * // do the work here + * stop() + * } + * + * }}} + */ abstract class MahoutSparkDriver extends MahoutDriver { - implicit protected var sparkConf = new SparkConf() + implicit var sparkConf = new SparkConf() - /** Creates a Spark context to run the job inside. - * Override to set the SparkConf values specific to the job, - * these must be set before the context is created. - * */ - protected def start() : Unit = { + /** + * Creates a Spark context to run the job inside. + * Override to set the SparkConf values specific to the job, + * these must be set before the context is created. + */ + override protected def start() : Unit = { if (!_useExistingContext) { + /* hack around SPARK-6069 Spark 1.2.1 deserialization of HashBiMap throwing ClassNotFound--doesn't seem to work + sparkConf.set("spark.files.userClassPathFirst", "true") + sparkConf.set("spark.executor.userClassPathFirst", "true") + */ + sparkConf.set("spark.kryo.referenceTracking", "false") .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) //else leave as set in Spark config - mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String], + mc = mahoutSparkContext( + masterUrl = parser.opts("master").asInstanceOf[String], appName = parser.opts("appName").asInstanceOf[String], sparkConf = sparkConf) } } + /** + * Call this before start to use an existing context as when running multiple drivers from a scalatest suite. + * @param context An already set up context to run against + */ def useContext(context: DistributedContext): Unit = { _useExistingContext = true mc = context http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala index a46d2ee..b3a1ec2 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala @@ -18,26 +18,30 @@ package org.apache.mahout.drivers import org.apache.spark.SparkConf +/** Adds parsing of Spark specific options to the option parser */ class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(programName: String){ - def parseSparkOptions(implicit sparkConf: SparkConf) = { + def parseSparkOptions()(implicit sparkConf: SparkConf) = { opts = opts ++ MahoutOptionParser.SparkOptions opts = opts + ("appName" -> programName) note("\nSpark config options:") - opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => - options + ("master" -> x) + opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " + + "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => + options + ("master" -> x) } - opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: as Spark config specifies") action { (x, options) => - options + ("sparkExecutorMem" -> x) + opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " + + "node (optional). Default: as Spark config specifies") action { (x, options) => + options + ("sparkExecutorMem" -> x) } opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) => sparkConf.set(k, v) } validate { x => if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank") - } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before creating this job's Spark context (optional)") + } keyValueName("<sparkConfKey>", "<sparkConfValue>") text ("Set the <sparkConfKey> to <sparkConfValue> before " + + "creating this job's Spark context (optional)") } } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala index 8c1bce4..3b47452 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -26,12 +26,11 @@ import scala.collection.immutable.HashMap /** * Command line interface for [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]]. * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] - * with domain specific IDS of the form - * (row id, column id: strength, ...). The IDs will be preserved in the + * with domain specific IDS of the form (row id, column id: strength, ...). The IDs will be preserved in the * output. The rows define a matrix and [[org.apache.mahout.math.cf.SimilarityAnalysis#rowSimilarityIDSs( )]] - * will be used to calculate row-wise similarity using log-likelihood - * The options allow control of the input schema, file discovery, output schema, and control of - * algorithm parameters. + * will be used to calculate row-wise similarity using log-likelihood. The options allow control of the input + * schema, file discovery, output schema, and control of algorithm parameters. + * * To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default * values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....) * and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....) @@ -49,6 +48,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { private var readWriteSchema: Schema = _ /** + * Entry point, not using Scala App trait * @param args Command line args, if empty a help message is printed. */ override def main(args: Array[String]): Unit = { @@ -67,35 +67,34 @@ object RowSimilarityDriver extends MahoutSparkDriver { options + ("maxObservations" -> x) } text ("Max number of observations to consider per row (optional). Default: " + RowSimilarityOptions("maxObservations")) validate { x => - if (x > 0) success else failure("Option --maxObservations must be > 0") + if (x > 0) success else failure("Option --maxObservations must be > 0") } opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) => options + ("maxSimilaritiesPerRow" -> x) } text ("Limit the number of similarities per item to this number (optional). Default: " + RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x => - if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0") + if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0") } - /** --threshold not implemented in SimilarityAnalysis.rowSimilarity - * todo: replacing the threshold with some % of the best values and/or a - * confidence measure expressed in standard deviations would be nice. - */ + // --threshold not implemented in SimilarityAnalysis.rowSimilarity + // todo: replacing the threshold with some % of the best values and/or a + // confidence measure expressed in standard deviations would be nice. //Driver notes--driver specific note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.") //Drm output schema--not driver specific, drm specific - parseDrmFormatOptions + parseIndexedDatasetFormatOptions() //How to search for input - parseFileDiscoveryOptions + parseFileDiscoveryOptions() //Spark config options--not driver specific - parseSparkOptions + parseSparkOptions() //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI - parseGenericOptions + parseGenericOptions() help("help") abbr ("h") text ("prints this usage text\n") @@ -106,9 +105,9 @@ object RowSimilarityDriver extends MahoutSparkDriver { } } - override protected def start() : Unit = { + override protected def start(): Unit = { - super.start + super.start() readWriteSchema = new Schema( "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String], @@ -120,8 +119,8 @@ object RowSimilarityDriver extends MahoutSparkDriver { private def readIndexedDataset: IndexedDataset = { - val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], - parser.opts("recursive").asInstanceOf[Boolean]).uris + val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], + parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris if (inFiles.isEmpty) { null.asInstanceOf[IndexedDataset] @@ -132,8 +131,8 @@ object RowSimilarityDriver extends MahoutSparkDriver { } } - override def process: Unit = { - start + override def process(): Unit = { + start() val indexedDataset = readIndexedDataset @@ -144,7 +143,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema) - stop + stop() } } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala index 368ee89..8531a0a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala @@ -58,56 +58,56 @@ object TestNBDriver extends MahoutSparkDriver { //How to search for input - parseFileDiscoveryOptions + parseFileDiscoveryOptions() - //Drm output schema--not driver specific, drm specific - parseDrmFormatOptions + //IndexedDataset output schema--not driver specific, IndexedDataset specific + parseIndexedDatasetFormatOptions() //Spark config options--not driver specific - parseSparkOptions + parseSparkOptions() //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI - parseGenericOptions + parseGenericOptions() help("help") abbr ("h") text ("prints this usage text\n") } parser.parse(args, parser.opts) map { opts => parser.opts = opts - process + process() } } -/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */ -private def readTestSet: DrmLike[_] = { - val inputPath = parser.opts("input").asInstanceOf[String] - val trainingSet= drm.drmDfsRead(inputPath) - trainingSet -} + /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */ + private def readTestSet: DrmLike[_] = { + val inputPath = parser.opts("input").asInstanceOf[String] + val trainingSet = drm.drmDfsRead(inputPath) + trainingSet + } -/** read the model from pathToModel using NBModel.DfsRead(...) */ -private def readModel: NBModel = { - val inputPath = parser.opts("pathToModel").asInstanceOf[String] - val model= NBModel.dfsRead(inputPath) - model -} + /** read the model from pathToModel using NBModel.DfsRead(...) */ + private def readModel: NBModel = { + val inputPath = parser.opts("pathToModel").asInstanceOf[String] + val model = NBModel.dfsRead(inputPath) + model + } -override def process: Unit = { - start() + override def process(): Unit = { + start() - val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean] - val outputPath = parser.opts("output").asInstanceOf[String] + val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean] + val outputPath = parser.opts("output").asInstanceOf[String] - // todo: get the -ow option in to check for a model in the path and overwrite if flagged. + // todo: get the -ow option in to check for a model in the path and overwrite if flagged. - val testSet = readTestSet - val model = readModel - val analyzer= NaiveBayes.test(model, testSet, testComplementary) + val testSet = readTestSet + val model = readModel + val analyzer = NaiveBayes.test(model, testSet, testComplementary) - println(analyzer) + println(analyzer) - stop -} + stop() + } }
