http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index 895bd01..a90e672 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -26,18 +26,21 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} import org.apache.mahout.sparkbindings._ import scala.collection.JavaConversions._ -/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type read and a reader function for reading text delimited files as described in the [[Schema]] - */ +/** + * Extends Reader trait to supply the [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] as + * the type read and a element and row reader functions for reading text delimited files as described in the + * [[org.apache.mahout.math.indexeddataset.Schema]] + */ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ - /** Read in text delimited elements from all URIs in the comma delimited source String and return - * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is - * no strength value in the element, assume it's presence means a strength of 1. - * - * @param mc context for the Spark job - * @param readSchema describes the delimiters and positions of values in the text delimited file. - * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]] - * @return - */ + /** + * Read in text delimited elements from all URIs in the comma delimited source String and return + * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is + * no strength value in the element, assume it's presence means a strength of 1. + * @param mc context for the Spark job + * @param readSchema describes the delimiters and positions of values in the text delimited file. + * @param source comma delimited URIs of text files to be read from + * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] + */ protected def elementReader( mc: DistributedContext, readSchema: Schema, @@ -116,16 +119,16 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ } } - /** Read in text delimited rows from all URIs in this comma delimited source String and return - * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is - * no strength value in the element, assume it's presence means a strength of 1. - * - * @param mc context for the Spark job - * @param readSchema describes the delimiters and positions of values in the text delimited file. - * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]] - * @return - */ - protected def drmReader( + /** + * Read in text delimited rows from all URIs in this comma delimited source String and return + * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is + * no strength value in the element, assume it's presence means a strength of 1. + * @param mc context for the Spark job + * @param readSchema describes the delimiters and positions of values in the text delimited file. + * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]] + * @return a new [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] + */ + protected def rowReader( mc: DistributedContext, readSchema: Schema, source: String, @@ -205,33 +208,36 @@ trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ } } - // this creates a BiMap from an ID collection. The ID points to an ordinal int - // which is used internal to Mahout as the row or column ID - // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a - // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit - // in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs - private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = { + /** + * Creates a BiMap from an ID collection. The ID points to an ordinal in which is used internal to Mahout + * as the row or column ID + * todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a + * non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit + * in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs + */ + private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), + entries: Array[String]): + BiMap[String, Int] = { var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index' for (entry <- entries) { if (!dictionary.contains(entry)){ dictionary.put(entry, index) index += 1 - }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason they do + }// the dictionary should never contain an entry since they are supposed to be distinct but for some reason + // they do } dictionary } } +/** Extends the Writer trait to supply the type being written and supplies the writer function */ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ - - private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - - /** Read in text delimited elements from all URIs in this comma delimited source String. - * - * @param mc context for the Spark job - * @param writeSchema describes the delimiters and positions of values in the output text delimited file. - * @param dest directory to write text delimited version of [[IndexedDatasetSpark]] - */ + /** + * Read in text delimited elements from all URIs in this comma delimited source String. + * @param mc context for the Spark job + * @param writeSchema describes the delimiters and positions of values in the output text delimited file. + * @param dest directory to write text delimited version of [[IndexedDatasetSpark]] + */ protected def writer( mc: DistributedContext, writeSchema: Schema, @@ -262,21 +268,20 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ matrix.rdd.map { case (rowID, itemVector) => // turn non-zeros into list for sorting - val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] + var itemList = List[(Int, Double)]() for (ve <- itemVector.nonZeroes) { - val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get) - itemList += item + itemList = itemList :+ (ve.index, ve.get) } //sort by highest value descending(-) - val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList + val vector = if (sort) itemList.sortBy { elem => -elem._2 } else itemList // first get the external rowID token if (!vector.isEmpty){ var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim // for the rest of the row, construct the vector contents of elements (external column ID, strength value) for (item <- vector) { - line += columnIDDictionary.inverse.get(item.getFirst) - if (!omitScore) line += columnIdStrengthDelim + item.getSecond + line += columnIDDictionary.inverse.get(item._1) + if (!omitScore) line += columnIdStrengthDelim + item._2 line += elementDelim } // drop the last delimiter, not needed to end the line @@ -296,26 +301,33 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ /** A combined trait that reads and writes */ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter -/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. - * @param readSchema describes the delimiters and position of values in the text delimited file to be read. - * @param mc Spark context for reading files - * @note The source is supplied to Reader#readElementsFrom . - * */ +/** + * Reads text delimited files into an IndexedDataset. Classes can be used to supply trait params in their constructor. + * @param readSchema describes the delimiters and position of values in the text delimited file to be read. + * @param mc Spark context for reading files + * @note The source is supplied to Reader#readElementsFrom . + */ class TextDelimitedIndexedDatasetReader(val readSchema: Schema) (implicit val mc: DistributedContext) extends TDIndexedDatasetReader -/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. - * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. - * @param mc Spark context for reading files - * @note the destination is supplied to Writer#writeTo - * */ -class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter - -/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor. - * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read. - * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. - * @param mc Spark context for reading the files, may be implicitly defined. - * */ +/** + * Writes text delimited files into an IndexedDataset. Classes can be used to supply trait params in their + * constructor. + * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. + * @param mc Spark context for reading files + * @note the destination is supplied to Writer#writeTo + */ +class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true) + (implicit val mc: DistributedContext) + extends TDIndexedDatasetWriter + +/** + * Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in + * their constructor. + * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read. + * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. + * @param mc Spark context for reading the files, may be implicitly defined. + */ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true) (implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter
http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index 3d03c1d..4f88c13 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -52,23 +52,23 @@ object TrainNBDriver extends MahoutSparkDriver { //How to search for input - parseFileDiscoveryOptions + parseFileDiscoveryOptions() - //Drm output schema--not driver specific, drm specific - parseDrmFormatOptions + //IndexedDataset output schema--not driver specific, IndexedDataset specific + parseIndexedDatasetFormatOptions() //Spark config options--not driver specific - parseSparkOptions + parseSparkOptions() //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI - parseGenericOptions + parseGenericOptions() help("help") abbr ("h") text ("prints this usage text\n") } parser.parse(args, parser.opts) map { opts => parser.opts = opts - process + process() } } @@ -79,7 +79,7 @@ object TrainNBDriver extends MahoutSparkDriver { trainingSet } - override def process: Unit = { + override def process(): Unit = { start() val complementary = parser.opts("trainComplementary").asInstanceOf[Boolean] @@ -91,7 +91,7 @@ object TrainNBDriver extends MahoutSparkDriver { model.dfsWrite(outputPath) - stop + stop() } } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index c0d36c6..47eb40b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -252,10 +252,10 @@ object SparkEngine extends DistributedEngine { } /** - * reads an IndexedDatasetSpark from default text delimited files + * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text + * delimited files. Reads a vector per row. * @param src a comma separated list of URIs to read from * @param schema how the text file is formatted - * @return */ def indexedDatasetDFSRead(src: String, schema: Schema = DefaultIndexedDatasetReadSchema, @@ -263,15 +263,15 @@ object SparkEngine extends DistributedEngine { (implicit sc: DistributedContext): IndexedDatasetSpark = { val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) - val ids = reader.readDRMFrom(src, existingRowIDs) + val ids = reader.readRowsFrom(src, existingRowIDs) ids } /** - * reads an IndexedDatasetSpark from default text delimited files + * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text + * delimited files. Reads an element per row. * @param src a comma separated list of URIs to read from * @param schema how the text file is formatted - * @return */ def indexedDatasetDFSReadElements(src: String, schema: Schema = DefaultIndexedDatasetElementReadSchema, http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala index d3aa0a8..30b32ad 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala @@ -26,8 +26,12 @@ import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, /** * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific * dfsWrite method + * @param matrix a [[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap + * @param rowIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs + * @param columnIDs a bidirectional map for Mahout Int IDs to/from application specific string IDs */ -class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int]) +class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], + val columnIDs: BiMap[String,Int]) extends IndexedDataset { /** Secondary constructor enabling immutability */ @@ -35,14 +39,19 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[St this(id2.matrix, id2.rowIDs, id2.columnIDs) } - /** Factory method used to create this extending class when the interface of - * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */ + /** + * Factory method used to create this extending class when the interface of + * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. + */ override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]): IndexedDatasetSpark = { new IndexedDatasetSpark(matrix, rowIDs, columnIDs) } - /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/ + /** + * Implements the core method to write [[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and + * replace the writer to change how it is written. + */ override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema) (implicit sc: DistributedContext): Unit = { http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index c441716..8199708 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -223,7 +223,11 @@ package object sparkbindings { // During maven tests, the maven classpath also creeps in for some reason !n.matches(".*/.m2/.*") ) - + /* verify jar passed to context + log.info("\n\n\n") + mcjars.foreach(j => log.info(j)) + log.info("\n\n\n") + */ mcjars } http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala index 76c3553..ea6b40f 100644 --- a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala @@ -63,7 +63,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "iphone\tipad:1.7260924347106847", "surface") - val CrossIndicatorLines = Iterable( + val CrossSimilarityLines = Iterable( "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", @@ -78,49 +78,45 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "iphone\tipad:1.7260924347106847", "surface")) - val CrossIndicatorTokens = tokenize(Iterable( + val CrossSimilarityTokens = tokenize(Iterable( "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 ipad:0.6795961471815897 galaxy:0.6795961471815897", "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 ipad:1.7260924347106847 galaxy:1.7260924347106847", "surface\tsurface:4.498681156950466 nexus:0.6795961471815897")) - // now in MahoutSuite - // final val TmpDir = "tmp/" // all IO going to whatever the default HDFS config is pointing to - /* //Clustered Spark and HDFS, not a good everyday build test ItemSimilarityDriver.main(Array( "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt", - "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/", + "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/similarityMatrices/", "--master", "spark://occam4:7077", "--filter1", "purchase", "--filter2", "view", "--inDelim", ",", "--itemIDColumn", "2", "--rowIDColumn", "0", - "--filterColumn", "1" - )) -*/ + "--filterColumn", "1")) + */ // local multi-threaded Spark with HDFS using large dataset // not a good build test. - /* ItemSimilarityDriver.main(Array( + /* + ItemSimilarityDriver.main(Array( "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt", - "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/", + "--output", "hdfs://occam4:54310/user/pat/xrsj/similarityMatrices/", "--master", "local[4]", "--filter1", "purchase", "--filter2", "view", "--inDelim", ",", "--itemIDColumn", "2", "--rowIDColumn", "0", - "--filterColumn", "1" - )) + "--filterColumn", "1")) */ test("ItemSimilarityDriver, non-full-spec CSV") { val InFile = TmpDir + "in-file.csv/" //using part files, not single file - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "u1,purchase,iphone", @@ -163,10 +159,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { // todo: these comparisons rely on a sort producing the same lines, which could possibly // fail since the sort is on value and these can be the same for all items in a vector - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens } @@ -174,7 +170,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { test("ItemSimilarityDriver TSV ") { val InFile = TmpDir + "in-file.tsv/" - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "u1\tpurchase\tiphone", @@ -216,17 +212,17 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { // todo: a better test would be to get sorted vectors and compare rows instead of tokens, this might miss // some error cases - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens } test("ItemSimilarityDriver log-ish files") { val InFile = TmpDir + "in-file.log/" - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone", @@ -267,10 +263,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--filterColumn", "2")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens } @@ -280,7 +276,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { val InFilename = "in-file.tsv" val InPath = InDir + InFilename - val OutPath = TmpDir + "indicator-matrices" + val OutPath = TmpDir + "similarity-matrices" val lines = Array( "0,0,1", @@ -312,8 +308,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--output", OutPath, "--master", masterUrl)) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs Answer + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs Answer } @@ -323,7 +319,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { val InFilename = "in-file.tsv" val InPath = InDir + InFilename - val OutPath = TmpDir + "indicator-matrices" + val OutPath = TmpDir + "similarity-matrices" val lines = Array( "0,0,1", @@ -356,8 +352,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--master", masterUrl, "--omitStrength")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs Answer + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs Answer } @@ -397,7 +393,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { val InPathM2 = InDirM2 + InFilenameM2 val InPathStart = TmpDir + "data/" - val OutPath = TmpDir + "indicator-matrices" + val OutPath = TmpDir + "similarity-matrices" // this creates one part-0000 file in the directory mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle = true).saveAsTextFile(InDirM1) @@ -429,10 +425,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--filenamePattern", "m..tsv", "--recursive")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens } @@ -440,7 +436,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "u1,purchase,iphone", @@ -482,10 +478,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--rowIDColumn", "0", "--filterColumn", "1")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(crossIndicatorLines) should contain theSameElementsAs CrossIndicatorTokens + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(crossSimilarityLines) should contain theSameElementsAs CrossSimilarityTokens } @@ -493,7 +489,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "u1,purchase,iphone", @@ -549,10 +545,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--rowIDColumn", "0", "--filterColumn", "1")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity - tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs UnequalDimensionsSelfSimilarity + tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarity } @@ -566,7 +562,7 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { */ val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "u1,purchase,iphone", @@ -590,7 +586,8 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 phones:1.7260924347106847", "surface\tmobile_acc:0.6795961471815897", "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 phones:0.6795961471815897", - "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 mobile_acc:1.7260924347106847", + "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 phones:1.7260924347106847 " + + "mobile_acc:1.7260924347106847", "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897")) // this will create multiple part-xxxxx files in the InFile dir but other tests will @@ -612,10 +609,10 @@ class ItemSimilarityDriverSuite extends FunSuite with DistributedSparkSuite { "--filterColumn", "1", "--writeAllDatasets")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines } @@ -673,7 +670,7 @@ removed ==> u3 0 0 1 0 */ val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file - val OutPath = TmpDir + "indicator-matrices/" + val OutPath = TmpDir + "similarity-matrices/" val lines = Array( "u1,purchase,iphone", @@ -719,10 +716,10 @@ removed ==> u3 0 0 1 0 "--filterColumn", "1", "--writeAllDatasets")) - val indicatorLines = mahoutCtx.textFile(OutPath + "/indicator-matrix/").collect.toIterable - val crossIndicatorLines = mahoutCtx.textFile(OutPath + "/cross-indicator-matrix/").collect.toIterable - tokenize(indicatorLines) should contain theSameElementsAs SelfSimilairtyTokens - tokenize(crossIndicatorLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines + val similarityLines = mahoutCtx.textFile(OutPath + "/similarity-matrix/").collect.toIterable + val crossSimilarityLines = mahoutCtx.textFile(OutPath + "/cross-similarity-matrix/").collect.toIterable + tokenize(similarityLines) should contain theSameElementsAs SelfSimilairtyTokens + tokenize(crossSimilarityLines) should contain theSameElementsAs UnequalDimensionsCrossSimilarityLines } // convert into an Iterable of tokens for 'should contain theSameElementsAs Iterable' http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala index 29c8bea..f18ec70 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala @@ -57,6 +57,10 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat // initContext() } + override protected def afterAll(configMap: ConfigMap): Unit = { + super.afterAll(configMap) + resetContext() + } override protected def beforeAll(configMap: ConfigMap): Unit = { super.beforeAll(configMap)
