http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index 895bd01..a90e672 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -26,18 +26,21 @@ import org.apache.mahout.math.drm.{DistributedContext, 
CheckpointedDrm}
 import org.apache.mahout.sparkbindings._
 import scala.collection.JavaConversions._
 
-/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type 
read and a reader function for reading text delimited files as described in the 
[[Schema]]
-  */
+/**
+ * Extends Reader trait to supply the 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] as
+ * the type read and a element and row reader functions for reading text 
delimited files as described in the
+ * [[org.apache.mahout.math.indexeddataset.Schema]]
+ */
 trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{
-  /** Read in text delimited elements from all URIs in the comma delimited 
source String and return
-    * the DRM of all elements updating the dictionaries for row and column 
dictionaries. If there is
-    * no strength value in the element, assume it's presence means a strength 
of 1.
-    *
-    * @param mc context for the Spark job
-    * @param readSchema describes the delimiters and positions of values in 
the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the 
[[IndexedDatasetSpark]]
-    * @return
-    */
+  /**
+   * Read in text delimited elements from all URIs in the comma delimited 
source String and return
+   * the DRM of all elements updating the dictionaries for row and column 
dictionaries. If there is
+   * no strength value in the element, assume it's presence means a strength 
of 1.
+   * @param mc context for the Spark job
+   * @param readSchema describes the delimiters and positions of values in the 
text delimited file.
+   * @param source comma delimited URIs of text files to be read from
+   * @return a new 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+   */
   protected def elementReader(
       mc: DistributedContext,
       readSchema: Schema,
@@ -116,16 +119,16 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
     }
   }
 
-  /** Read in text delimited rows from all URIs in this comma delimited source 
String and return
-    * the DRM of all elements updating the dictionaries for row and column 
dictionaries. If there is
-    * no strength value in the element, assume it's presence means a strength 
of 1.
-    *
-    * @param mc context for the Spark job
-    * @param readSchema describes the delimiters and positions of values in 
the text delimited file.
-    * @param source comma delimited URIs of text files to be read into the 
[[IndexedDatasetSpark]]
-    * @return
-    */
-  protected def drmReader(
+  /**
+   * Read in text delimited rows from all URIs in this comma delimited source 
String and return
+   * the DRM of all elements updating the dictionaries for row and column 
dictionaries. If there is
+   * no strength value in the element, assume it's presence means a strength 
of 1.
+   * @param mc context for the Spark job
+   * @param readSchema describes the delimiters and positions of values in the 
text delimited file.
+   * @param source comma delimited URIs of text files to be read into the 
[[IndexedDatasetSpark]]
+   * @return a new 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]]
+   */
+  protected def rowReader(
       mc: DistributedContext,
       readSchema: Schema,
       source: String,
@@ -205,33 +208,36 @@ trait TDIndexedDatasetReader extends 
Reader[IndexedDatasetSpark]{
     }
   }
 
-  // this creates a BiMap from an ID collection. The ID points to an ordinal 
int
-  // which is used internal to Mahout as the row or column ID
-  // todo: this is a non-distributed process in an otherwise distributed 
reader and the BiMap is a
-  // non-rdd based object--this will limit the size of the dataset to ones 
where the dictionaries fit
-  // in-memory, the option is to put the dictionaries in rdds and do joins to 
translate IDs
-  private def asOrderedDictionary(dictionary: BiMap[String, Int] = 
HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = {
+  /**
+   * Creates a BiMap from an ID collection. The ID points to an ordinal in 
which is used internal to Mahout
+   * as the row or column ID
+   * todo: this is a non-distributed process in an otherwise distributed 
reader and the BiMap is a
+   * non-rdd based object--this will limit the size of the dataset to ones 
where the dictionaries fit
+   * in-memory, the option is to put the dictionaries in rdds and do joins to 
translate IDs
+   */
+  private def asOrderedDictionary(dictionary: BiMap[String, Int] = 
HashBiMap.create(),
+      entries: Array[String]):
+    BiMap[String, Int] = {
     var index = dictionary.size() // if a dictionary is supplied then add to 
the end based on the Mahout id 'index'
     for (entry <- entries) {
       if (!dictionary.contains(entry)){
         dictionary.put(entry, index)
         index += 1
-      }// the dictionary should never contain an entry since they are supposed 
to be distinct but for some reason they do
+      }// the dictionary should never contain an entry since they are supposed 
to be distinct but for some reason
+      // they do
     }
     dictionary
   }
 }
 
+/** Extends the Writer trait to supply the type being written and supplies the 
writer function */
 trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{
-
-  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, 
score1), (_, score2)) => score1 > score2}
-
-  /** Read in text delimited elements from all URIs in this comma delimited 
source String.
-    *
-    * @param mc context for the Spark job
-    * @param writeSchema describes the delimiters and positions of values in 
the output text delimited file.
-    * @param dest directory to write text delimited version of 
[[IndexedDatasetSpark]]
-    */
+  /**
+   * Read in text delimited elements from all URIs in this comma delimited 
source String.
+   * @param mc context for the Spark job
+   * @param writeSchema describes the delimiters and positions of values in 
the output text delimited file.
+   * @param dest directory to write text delimited version of 
[[IndexedDatasetSpark]]
+   */
   protected def writer(
       mc: DistributedContext,
       writeSchema: Schema,
@@ -262,21 +268,20 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDatasetSpark]{
       matrix.rdd.map { case (rowID, itemVector) =>
 
         // turn non-zeros into list for sorting
-        val itemList: 
collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] 
= new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, 
Double]]
+        var itemList = List[(Int, Double)]()
         for (ve <- itemVector.nonZeroes) {
-          val item: org.apache.mahout.common.Pair[Integer, Double] = new 
org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get)
-          itemList += item
+          itemList = itemList :+ (ve.index, ve.get)
         }
         //sort by highest value descending(-)
-        val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList
+        val vector = if (sort) itemList.sortBy { elem => -elem._2 } else 
itemList
 
         // first get the external rowID token
         if (!vector.isEmpty){
           var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim
           // for the rest of the row, construct the vector contents of 
elements (external column ID, strength value)
           for (item <- vector) {
-            line += columnIDDictionary.inverse.get(item.getFirst)
-            if (!omitScore) line += columnIdStrengthDelim + item.getSecond
+            line += columnIDDictionary.inverse.get(item._1)
+            if (!omitScore) line += columnIdStrengthDelim + item._2
             line += elementDelim
           }
           // drop the last delimiter, not needed to end the line
@@ -296,26 +301,33 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDatasetSpark]{
 /** A combined trait that reads and writes */
 trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with 
TDIndexedDatasetWriter
 
-/** Reads text delimited files into an IndexedDataset. Classes are needed to 
supply trait params in their constructor.
-  * @param readSchema describes the delimiters and position of values in the 
text delimited file to be read.
-  * @param mc Spark context for reading files
-  * @note The source is supplied to Reader#readElementsFrom .
-  * */
+/**
+ * Reads text delimited files into an IndexedDataset. Classes can be used to 
supply trait params in their constructor.
+ * @param readSchema describes the delimiters and position of values in the 
text delimited file to be read.
+ * @param mc Spark context for reading files
+ * @note The source is supplied to Reader#readElementsFrom .
+ */
 class TextDelimitedIndexedDatasetReader(val readSchema: Schema)
     (implicit val mc: DistributedContext) extends TDIndexedDatasetReader
 
-/** Writes  text delimited files into an IndexedDataset. Classes are needed to 
supply trait params in their constructor.
-  * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
-  * @param mc Spark context for reading files
-  * @note the destination is supplied to Writer#writeTo
-  * */
-class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: 
Boolean = true)(implicit val mc: DistributedContext) extends 
TDIndexedDatasetWriter
-
-/** Reads and writes text delimited files to/from an IndexedDataset. Classes 
are needed to supply trait params in their constructor.
-  * @param readSchema describes the delimiters and position of values in the 
text delimited file(s) to be read.
-  * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
-  * @param mc Spark context for reading the files, may be implicitly defined.
-  * */
+/**
+ * Writes  text delimited files into an IndexedDataset. Classes can be used to 
supply trait params in their
+ * constructor.
+ * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
+ * @param mc Spark context for reading files
+ * @note the destination is supplied to Writer#writeTo
+ */
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: 
Boolean = true)
+    (implicit val mc: DistributedContext)
+  extends TDIndexedDatasetWriter
+
+/**
+ * Reads and writes text delimited files to/from an IndexedDataset. Classes 
are needed to supply trait params in
+ * their constructor.
+ * @param readSchema describes the delimiters and position of values in the 
text delimited file(s) to be read.
+ * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
+ * @param mc Spark context for reading the files, may be implicitly defined.
+ */
 class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val 
writeSchema: Schema, val sort: Boolean = true)
     (implicit val mc: DistributedContext)
   extends TDIndexedDatasetReaderWriter

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index 3d03c1d..4f88c13 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -52,23 +52,23 @@ object TrainNBDriver extends MahoutSparkDriver {
       
 
       //How to search for input
-      parseFileDiscoveryOptions
+      parseFileDiscoveryOptions()
 
-      //Drm output schema--not driver specific, drm specific
-      parseDrmFormatOptions
+      //IndexedDataset output schema--not driver specific, IndexedDataset 
specific
+      parseIndexedDatasetFormatOptions()
 
       //Spark config options--not driver specific
-      parseSparkOptions
+      parseSparkOptions()
 
       //Jar inclusion, this option can be set when executing the driver from 
compiled code, not when from CLI
-      parseGenericOptions
+      parseGenericOptions()
 
       help("help") abbr ("h") text ("prints this usage text\n")
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process
+      process()
     }
   }
 
@@ -79,7 +79,7 @@ object TrainNBDriver extends MahoutSparkDriver {
     trainingSet
   }
 
-  override def process: Unit = {
+  override def process(): Unit = {
     start()
 
     val complementary = parser.opts("trainComplementary").asInstanceOf[Boolean]
@@ -91,7 +91,7 @@ object TrainNBDriver extends MahoutSparkDriver {
 
     model.dfsWrite(outputPath)
 
-    stop
+    stop()
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index c0d36c6..47eb40b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -252,10 +252,10 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-   * reads an IndexedDatasetSpark from default text delimited files
+   * Returns an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from 
default text
+   * delimited files. Reads a vector per row.
    * @param src a comma separated list of URIs to read from
    * @param schema how the text file is formatted
-   * @return
    */
   def indexedDatasetDFSRead(src: String,
       schema: Schema = DefaultIndexedDatasetReadSchema,
@@ -263,15 +263,15 @@ object SparkEngine extends DistributedEngine {
       (implicit sc: DistributedContext):
     IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
-    val ids = reader.readDRMFrom(src, existingRowIDs)
+    val ids = reader.readRowsFrom(src, existingRowIDs)
     ids
   }
 
   /**
-   * reads an IndexedDatasetSpark from default text delimited files
+   * Returns an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from 
default text
+   * delimited files. Reads an element per row.
    * @param src a comma separated list of URIs to read from
    * @param schema how the text file is formatted
-   * @return
    */
   def indexedDatasetDFSReadElements(src: String,
       schema: Schema = DefaultIndexedDatasetElementReadSchema,

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
index d3aa0a8..30b32ad 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala
@@ -26,8 +26,12 @@ import 
org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema,
 /**
  * Spark implementation of 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark 
specific
  * dfsWrite method
+ * @param matrix a 
[[org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark]] to wrap
+ * @param rowIDs a bidirectional map for Mahout Int IDs to/from application 
specific string IDs
+ * @param columnIDs a bidirectional map for Mahout Int IDs to/from application 
specific string IDs
  */
-class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: 
BiMap[String,Int], val columnIDs: BiMap[String,Int])
+class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: 
BiMap[String,Int],
+    val columnIDs: BiMap[String,Int])
   extends IndexedDataset {
 
   /** Secondary constructor enabling immutability */
@@ -35,14 +39,19 @@ class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], 
val rowIDs: BiMap[St
     this(id2.matrix, id2.rowIDs, id2.columnIDs)
   }
 
-  /** Factory method used to create this extending class when the interface of
-    * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is 
known. */
+  /**
+   * Factory method used to create this extending class when the interface of
+   * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is 
known.
+   */
   override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], 
columnIDs: BiMap[String,Int]):
     IndexedDatasetSpark = {
     new IndexedDatasetSpark(matrix, rowIDs, columnIDs)
   }
 
-  /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/
+  /**
+   * Implements the core method to write 
[[org.apache.mahout.math.indexeddataset.IndexedDataset]]. Override and
+   * replace the writer to change how it is written.
+   */
   override def dfsWrite(dest: String, schema: Schema = 
DefaultIndexedDatasetWriteSchema)
       (implicit sc: DistributedContext):
     Unit = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index c441716..8199708 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -223,7 +223,11 @@ package object sparkbindings {
           // During maven tests, the maven classpath also creeps in for some 
reason
           !n.matches(".*/.m2/.*")
         )
-
+    /* verify jar passed to context
+    log.info("\n\n\n")
+    mcjars.foreach(j => log.info(j))
+    log.info("\n\n\n")
+    */
     mcjars
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
index 76c3553..ea6b40f 100644
--- 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriverSuite.scala
@@ -63,7 +63,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
     "iphone\tipad:1.7260924347106847",
     "surface")
 
-  val CrossIndicatorLines = Iterable(
+  val CrossSimilarityLines = Iterable(
     "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 
ipad:1.7260924347106847 galaxy:1.7260924347106847",
     "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 
ipad:0.6795961471815897 galaxy:0.6795961471815897",
     "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 
ipad:0.6795961471815897 galaxy:0.6795961471815897",
@@ -78,49 +78,45 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
     "iphone\tipad:1.7260924347106847",
     "surface"))
 
-  val CrossIndicatorTokens = tokenize(Iterable(
+  val CrossSimilarityTokens = tokenize(Iterable(
     "iphone\tnexus:1.7260924347106847 iphone:1.7260924347106847 
ipad:1.7260924347106847 galaxy:1.7260924347106847",
     "ipad\tnexus:0.6795961471815897 iphone:0.6795961471815897 
ipad:0.6795961471815897 galaxy:0.6795961471815897",
     "nexus\tnexus:0.6795961471815897 iphone:0.6795961471815897 
ipad:0.6795961471815897 galaxy:0.6795961471815897",
     "galaxy\tnexus:1.7260924347106847 iphone:1.7260924347106847 
ipad:1.7260924347106847 galaxy:1.7260924347106847",
     "surface\tsurface:4.498681156950466 nexus:0.6795961471815897"))
 
-  // now in MahoutSuite
-  // final val TmpDir = "tmp/" // all IO going to whatever the default HDFS 
config is pointing to
-
   /*
     //Clustered Spark and HDFS, not a good everyday build test
     ItemSimilarityDriver.main(Array(
         "--input", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
-        "--output", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+        "--output", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/similarityMatrices/",
         "--master", "spark://occam4:7077",
         "--filter1", "purchase",
         "--filter2", "view",
         "--inDelim", ",",
         "--itemIDColumn", "2",
         "--rowIDColumn", "0",
-        "--filterColumn", "1"
-    ))
-*/
+        "--filterColumn", "1"))
+  */
   // local multi-threaded Spark with HDFS using large dataset
   // not a good build test.
-  /*    ItemSimilarityDriver.main(Array(
+  /*
+    ItemSimilarityDriver.main(Array(
       "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
-      "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+      "--output", "hdfs://occam4:54310/user/pat/xrsj/similarityMatrices/",
       "--master", "local[4]",
       "--filter1", "purchase",
       "--filter2", "view",
       "--inDelim", ",",
       "--itemIDColumn", "2",
       "--rowIDColumn", "0",
-      "--filterColumn", "1"
-    ))
+      "--filterColumn", "1"))
   */
 
   test("ItemSimilarityDriver, non-full-spec CSV") {
 
     val InFile = TmpDir + "in-file.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -163,10 +159,10 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
 
     // todo: these comparisons rely on a sort producing the same lines, which 
could possibly
     // fail since the sort is on value and these can be the same for all items 
in a vector
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
CrossSimilarityTokens
   }
 
 
@@ -174,7 +170,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
   test("ItemSimilarityDriver TSV ") {
 
     val InFile = TmpDir + "in-file.tsv/"
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1\tpurchase\tiphone",
@@ -216,17 +212,17 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
 
     // todo: a better test would be to get sorted vectors and compare rows 
instead of tokens, this might miss
     // some error cases
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
CrossSimilarityTokens
 
   }
 
   test("ItemSimilarityDriver log-ish files") {
 
     val InFile = TmpDir + "in-file.log/"
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "2014-06-23 14:46:53.115\tu1\tpurchase\trandom text\tiphone",
@@ -267,10 +263,10 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--filterColumn", "2"))
 
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
CrossSimilarityTokens
 
   }
 
@@ -280,7 +276,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
     val InFilename = "in-file.tsv"
     val InPath = InDir + InFilename
 
-    val OutPath = TmpDir + "indicator-matrices"
+    val OutPath = TmpDir + "similarity-matrices"
 
     val lines = Array(
       "0,0,1",
@@ -312,8 +308,8 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--output", OutPath,
       "--master", masterUrl))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs Answer
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs Answer
 
   }
 
@@ -323,7 +319,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
     val InFilename = "in-file.tsv"
     val InPath = InDir + InFilename
 
-    val OutPath = TmpDir + "indicator-matrices"
+    val OutPath = TmpDir + "similarity-matrices"
 
     val lines = Array(
       "0,0,1",
@@ -356,8 +352,8 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--master", masterUrl,
       "--omitStrength"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs Answer
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs Answer
 
   }
 
@@ -397,7 +393,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
     val InPathM2 = InDirM2 + InFilenameM2
 
     val InPathStart = TmpDir + "data/"
-    val OutPath = TmpDir + "indicator-matrices"
+    val OutPath = TmpDir + "similarity-matrices"
 
     // this creates one part-0000 file in the directory
     mahoutCtx.parallelize(M1Lines).coalesce(1, shuffle = 
true).saveAsTextFile(InDirM1)
@@ -429,10 +425,10 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--filenamePattern", "m..tsv",
       "--recursive"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
CrossSimilarityTokens
 
   }
 
@@ -440,7 +436,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
 
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -482,10 +478,10 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--rowIDColumn", "0",
       "--filterColumn", "1"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
CrossIndicatorTokens
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
CrossSimilarityTokens
 
   }
 
@@ -493,7 +489,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
 
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -549,10 +545,10 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--rowIDColumn", "0",
       "--filterColumn", "1"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
UnequalDimensionsSelfSimilarity
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarity
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
UnequalDimensionsSelfSimilarity
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarity
 
   }
 
@@ -566,7 +562,7 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
     */
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -590,7 +586,8 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "iphone\tmobile_acc:1.7260924347106847 soap:1.7260924347106847 
phones:1.7260924347106847",
       "surface\tmobile_acc:0.6795961471815897",
       "nexus\ttablets:1.7260924347106847 mobile_acc:0.6795961471815897 
phones:0.6795961471815897",
-      "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 
phones:1.7260924347106847 mobile_acc:1.7260924347106847",
+      "galaxy\ttablets:5.545177444479561 soap:1.7260924347106847 
phones:1.7260924347106847 " +
+        "mobile_acc:1.7260924347106847",
       "ipad\tmobile_acc:0.6795961471815897 phones:0.6795961471815897"))
 
     // this will create multiple part-xxxxx files in the InFile dir but other 
tests will
@@ -612,10 +609,10 @@ class ItemSimilarityDriverSuite extends FunSuite with 
DistributedSparkSuite {
       "--filterColumn", "1",
       "--writeAllDatasets"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarityLines
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarityLines
 
   }
 
@@ -673,7 +670,7 @@ removed ==> u3      0             0       1           0
     */
     val InFile1 = TmpDir + "in-file1.csv/" //using part files, not single file
     val InFile2 = TmpDir + "in-file2.csv/" //using part files, not single file
-    val OutPath = TmpDir + "indicator-matrices/"
+    val OutPath = TmpDir + "similarity-matrices/"
 
     val lines = Array(
       "u1,purchase,iphone",
@@ -719,10 +716,10 @@ removed ==> u3    0             0       1           0
       "--filterColumn", "1",
       "--writeAllDatasets"))
 
-    val indicatorLines = mahoutCtx.textFile(OutPath + 
"/indicator-matrix/").collect.toIterable
-    val crossIndicatorLines = mahoutCtx.textFile(OutPath + 
"/cross-indicator-matrix/").collect.toIterable
-    tokenize(indicatorLines) should contain theSameElementsAs 
SelfSimilairtyTokens
-    tokenize(crossIndicatorLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarityLines
+    val similarityLines = mahoutCtx.textFile(OutPath + 
"/similarity-matrix/").collect.toIterable
+    val crossSimilarityLines = mahoutCtx.textFile(OutPath + 
"/cross-similarity-matrix/").collect.toIterable
+    tokenize(similarityLines) should contain theSameElementsAs 
SelfSimilairtyTokens
+    tokenize(crossSimilarityLines) should contain theSameElementsAs 
UnequalDimensionsCrossSimilarityLines
   }
 
   // convert into an Iterable of tokens for 'should contain theSameElementsAs 
Iterable'

http://git-wip-us.apache.org/repos/asf/mahout/blob/15ee1951/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index 29c8bea..f18ec70 100644
--- 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -57,6 +57,10 @@ trait DistributedSparkSuite extends DistributedMahoutSuite 
with LoggerConfigurat
 //    initContext()
   }
 
+  override protected def afterAll(configMap: ConfigMap): Unit = {
+    super.afterAll(configMap)
+    resetContext()
+  }
 
   override protected def beforeAll(configMap: ConfigMap): Unit = {
     super.beforeAll(configMap)

Reply via email to