Adding stuff for itemsimilarity driver for Spark

Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/74b9921c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/74b9921c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/74b9921c

Branch: refs/heads/mahout-1541
Commit: 74b9921c4c9bd8903585bbd74d9e66298ea8b7a0
Parents: 107a0ba
Author: pferrel <[email protected]>
Authored: Wed Jun 4 13:09:07 2014 -0700
Committer: pferrel <[email protected]>
Committed: Wed Jun 4 13:09:07 2014 -0700

----------------------------------------------------------------------
 spark/src/.DS_Store                             | Bin 6148 -> 0 bytes
 .../apache/mahout/drivers/FileSysUtils.scala    |  34 +++
 .../apache/mahout/drivers/IndexedDataset.scala  |  54 +++++
 .../drivers/IndexedDatasetDriverTest.scala      | 164 +++++++++++++++
 .../mahout/drivers/ItemSimilarityDriver.scala   | 208 +++++++++++++++++++
 .../apache/mahout/drivers/MahoutDriver.scala    |  74 +++++++
 .../mahout/drivers/MahoutOptionParser.scala     |  24 +++
 .../apache/mahout/drivers/ReaderWriter.scala    | 188 +++++++++++++++++
 .../org/apache/mahout/drivers/Schema.scala      |  30 +++
 .../mahout/drivers/IndexedDatasetTest.scala     |  25 +++
 .../drivers/ItemSimilarityDriver$Test.scala     | 180 ++++++++++++++++
 11 files changed, 981 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/.DS_Store
----------------------------------------------------------------------
diff --git a/spark/src/.DS_Store b/spark/src/.DS_Store
deleted file mode 100644
index 7b0d367..0000000
Binary files a/spark/src/.DS_Store and /dev/null differ

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
new file mode 100644
index 0000000..e8491f2
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+/**
+  * Returns a [[java.lang.String]]comma delimited list of URIs discovered 
based on parameters in the constructor.
+  * The String is formatted to be input into 
[[org.apache.spark.SparkContext.textFile()]]
+  *
+  * @param pathURI Where to start looking for inFiles, only HDFS and local URI 
are currently
+  *                supported
+  * @param filePattern regex that must match the entire filename to have the 
file included in the returned list
+  * @param recursive true traverses the filesystem recursively
+  */
+
+case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: 
Boolean = false) {
+  // todo: There is an HDFS filestatus method that collects multiple inFiles, 
see if this is the right thing to use
+  // todo: check to see if the input is a supported URI for collection or 
recursive search but just pass through otherwise
+  def uris = {pathURI}
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
new file mode 100644
index 0000000..36bda90
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import com.google.common.collect.BiMap
+import org.apache.mahout.math.drm.DrmLike
+
+/**
+  * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two 
[[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries.
+  * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID 
mappings so
+  * a user specified label or ID can be stored and mapped to and from the 
[[scala.Int]] ordinal ID
+  * used internal to Mahout Core code.
+  *
+  * Example: For a transpose job the 
[[org.apache.mahout.drivers.IndexedDataset#matrix]]: 
[[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code
+  * that transposes the values, then a resulting 
[[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed 
DrmLike object with swapped dictionaries (since the rows and columns are 
transposed). The new
+  * [[org.apache.mahout.drivers.IndexedDataset]] is returned.
+  *
+  * @param matrix  DrmLike[Int], representing the distributed matrix storing 
the actual data.
+  * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of 
external String ID to
+  *                  and from the ordinal Mahout Int ID. This one holds row 
labels
+  * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of 
external String
+  *                  ID to and from the ordinal Mahout Int ID. This one holds 
column labels
+  */
+
+case class IndexedDataset(matrix: DrmLike[Int], rowIDs: BiMap[String,Int], 
columnIDs: BiMap[String,Int]) {
+}
+
+/**
+  * Companion object for the case class 
[[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary 
constructor for
+  * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used 
when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]]
+  * {{{
+  * val indexedDataset = IndexedDataset(indexedDatasetStore.read)
+  * }}}
+  */
+
+object IndexedDataset {
+  /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */
+  def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix,  id2.rowIDs, 
id2.columnIDs)
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala
new file mode 100644
index 0000000..80fc134
--- /dev/null
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala
@@ -0,0 +1,164 @@
+package org.apache.mahout.drivers
+
+import org.apache.mahout.sparkbindings._
+import java.io.{FileWriter, BufferedWriter}
+import com.google.common.io.Closeables
+
+object ItemSimilarityDriverTest {
+
+  def main(args: Array[String]): Unit = {
+
+    System.setProperty("spark.kryo.referenceTracking", "false")
+    System.setProperty("spark.kryoserializer.buffer.mb", "100")
+
+    implicit val sc = mahoutSparkContext(masterUrl = "local", appName = 
"MahoutLocalContext",
+      customJars = Traversable.empty[String])
+
+    val exampleCsvlogStatements = Array(
+      "12569537329,user1,item1,\"view\"",
+      "12569537329,user1,item2,\"view\"",
+      "12569537329,user1,item2,\"like\"",
+      "12569537329,user1,item3,\"like\"",
+      "12569537329,user2,item2,\"view\"",
+      "12569537329,user2,item2,\"like\"",
+      "12569537329,user3,item1,\"like\"",
+      "12569537329,user3,item3,\"view\"",
+      "12569537329,user3,item3,\"like\""
+    )
+
+    val exampleTsvLogStatements = Array(
+      "12569537329\tuser1\titem1\t\"view\"",
+      "12569537329\tuser1\titem2\t\"view\"",
+      "12569537329\tuser1\titem2\t\"like\"",
+      "12569537329\tuser1\titem3\t\"like\"",
+      "12569537329\tuser2\titem2\t\"view\"",
+      "12569537329\tuser2\titem2\t\"like\"",
+      "12569537329\tuser3\titem1\t\"like\"",
+      "12569537329\tuser3\titem3\t\"view\"",
+      "12569537329\tuser3\titem3\t\"like\""
+    )
+
+
+    val csvLogStatements = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy"
+    )
+
+    val tsvLogStatements = Array(
+      "u1\tpurchase\tiphone",
+      "u1\tpurchase\tipad",
+      "u2\tpurchase\tnexus",
+      "u2\tpurchase\tgalaxy",
+      "u3\tpurchase\tsurface",
+      "u4\tpurchase\tiphone",
+      "u4\tpurchase\tgalaxy",
+      "u1\tview\tiphone",
+      "u1\tview\tipad",
+      "u1\tview\tnexus",
+      "u1\tview\tgalaxy",
+      "u2\tview\tiphone",
+      "u2\tview\tipad",
+      "u2\tview\tnexus",
+      "u2\tview\tgalaxy",
+      "u3\tview\tsurface",
+      "u3\tview\tnexus",
+      "u4\tview\tiphone",
+      "u4\tview\tipad",
+      "u4\tview\tgalaxy"
+    )
+
+    var w: BufferedWriter = null
+    try {
+      w = new BufferedWriter(new FileWriter("tmp/cf-data.txt"))
+      w.write(csvLogStatements.mkString("\n"))
+    } finally {
+      Closeables.close(w, false)
+    }
+    /*
+        val indexedLikes = IndexedDatasetStore.readTuples(sc, 
"tmp/cf-data.txt", 2, ",", 0, 2, 1, "purchase")
+
+        val indexedViews = IndexedDatasetStore.readTuples(sc, 
"tmp/cf-data.txt", 2, ",", 0, 2, 1, "view")
+
+        val drmLikes = indexedLikes.matrix
+        val drmViews = indexedViews.matrix
+
+        // Now we could run cooccurrence analysis using the DRMs, instead 
we'll just fetch and print the matrices
+        val drmXCooccurrences = cooccurrences(drmLikes, randomSeed = 
0xdeadbeef,
+          maxInterestingItemsPerThing = 100, maxNumInteractions = 500, 
Array(drmViews))
+
+        val inCoreViews = drmViews.collect
+        val inCoreLikes = drmLikes.collect
+        val inCoreIndicator = drmXCooccurrences(0).collect
+        val inCoreXIndicator = drmXCooccurrences(1).collect
+        println("\nLIKES:")
+        println(inCoreLikes)
+        println("\nVIEWS:")
+        println(inCoreViews)
+        println("\nINDICATOR MATRIX")
+        println(inCoreIndicator)
+        println("\nCROSS INDICATOR MATRIX")
+        println(inCoreXIndicator)
+    */
+    sc.stop()
+/*
+    //Clustered Spark and HDFS
+    ItemSimilarityDriver.main(Array(
+      "--input", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
+      "--output", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+      "--master", "spark://occam4:7077",
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"
+    ))
+*/
+    //local multi-threaded Spark with HDFS
+/*    ItemSimilarityDriver.main(Array(
+    "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
+    "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+    "--master", "local[4]",
+    "--filter1", "purchase",
+    "--filter2", "view",
+    "--inDelim", ",",
+    "--itemIDPosition", "2",
+    "--rowIDPosition", "0",
+    "--filterPosition", "1"
+  ))
+*/
+
+    //local multi-threaded Spark with local FS
+    ItemSimilarityDriver.main(Array(
+      "--input", "tmp/cf-data.txt",
+      "--output", "tmp/indicatorMatrices/",
+      "--master", "local[4]",
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"
+    ))
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
new file mode 100644
index 0000000..c72a3b3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala
@@ -0,0 +1,208 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.cf.CooccurrenceAnalysis
+
+/**
+  * Command line interface for 
[[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences()]].
+  * Command line interface for 
[[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences()]]. It text lines
+  * that contain (row id, column id, ...). The IDs are user specified strings 
which will be preserved in the
+  * output. The individual tuples will be accumulated into a matrix and 
[[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences()]]
+  * will be used to calculate row-wise self-similarity, or when using filters, 
will generate two
+  * matrices and calculate both the self similarity of the primary matrix and 
the row-wise similarity of the primary
+  * to the secondary. Returns one or two directories of text files formatted 
as specified in the options.
+  * @todo Should also take two input streams and do cross similarity with not 
filter required.
+  */
+object ItemSimilarityDriver extends MahoutDriver {
+
+  private var options: Options = _
+  private var readStore1: TextDelimitedIndexedDatasetReader = _
+  private var readStore2: TextDelimitedIndexedDatasetReader = _
+  private var writeStore: TextDelimitedIndexedDatasetWriter = _
+
+/**
+  * @param args  Command line args, if empty a help message is printed.
+  */
+  override def main(args: Array[String]): Unit = {
+    val parser = new MahoutOptionParser[Options]("ItemSimilarity") {
+      head("ItemSimilarity", "Spark")
+      note("Input, output options")
+      opt[String]('i', "input") required() action { (x, options) =>
+        options.copy(input = x)
+      } text ("Path for input. It may be a filename or directory name and can 
be a local file path or an HDFS URI (required).")
+      opt[String]('o', "output") required() action { (x, options) =>
+        options.copy(output = x)
+      } text ("Output will be in sub-directories stored here so this must be a 
directory path (required).")
+      note("\nAlgorithm control options:")
+      opt[String]("master") abbr ("ma") text ("URL for the Spark Master. 
(optional). Default: 'local'") action { (x, options) =>
+        options.copy(master = x)
+      }
+      opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) =>
+        options.copy(maxPrefs = x)
+      } text ("Max number of preferences to consider per user or item, users 
or items with more preferences will be sampled down (optional). Default: 500") 
validate { x =>
+        if (x > 0) success else failure("Option --maxPrefs must be > 0")
+      }
+      opt[Int]("minPrefs") abbr ("mp") action { (x, options) =>
+        options.copy(minPrefs = x)
+      } text ("Ignore users with less preferences than this (optional). 
Default: 1") validate { x =>
+        if (x > 0) success else failure("Option --minPrefs must be > 0")
+      }
+
+      opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) =>
+        options.copy(maxSimilaritiesPerItem = x)
+      } text ("Try to cap the number of similar items for each item to this 
number (optional). Default: 100") validate { x =>
+        if (x > 0) success else failure("Option --maxSimilaritiesPerItem must 
be > 0")
+      }
+      opt[Int]("randomSeed") abbr ("rs") action { (x, options) =>
+        options.copy(randomSeed = x)
+      } text ("Int to seed random number generator (optional). Default: Uses 
time to generate a seed") validate { x =>
+        if (x > 0) success else failure("Option --randomSeed must be > 0")
+      }
+      note("\nInput text file schema options:")
+      opt[String]("inDelim") abbr ("d") text ("Input delimiter character 
(optional). Default: '\\t'") action { (x, options) =>
+        options.copy(inDelim = x)
+      }
+      opt[String]("filter1") abbr ("f1") action { (x, options) =>
+        options.copy(filter1 = x)
+      } text ("String whose presence indicates a datum for the primary item 
set, can be a regex (optional). Default: no filtered is applied, all is used")
+      opt[String]("filter2") abbr ("f2") action { (x, options) =>
+        options.copy(filter2 = x)
+      } text ("String whose presence indicates a datum for the secondary item 
set, can be a regex (optional). Used in cross-cooccurrence. Default: no 
secondary filter is applied")
+      opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) =>
+        options.copy(rowIDPosition = x)
+      } text ("Column number (0 based Int) containing the row ID string 
(optional). Default: 0") validate { x =>
+        if (x >= 0) success else failure("Option --rowIDColNum must be >= 0")
+      }
+      opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) =>
+        options.copy(itemIDPosition = x)
+      } text ("Column number (0 based Int) containing the item ID string 
(optional). Default: 1") validate { x =>
+        if (x >= 0) success else failure("Option --itemIDColNum must be >= 0")
+      }
+      opt[Int]("filterPosition") abbr ("fc") action { (x, options) =>
+        options.copy(filterPosition = x)
+      } text ("Column number (0 based Int) containing the filter string 
(optional). Default: -1 for no filter") validate { x =>
+        if (x >= -1) success else failure("Option --filterColNum must be >= 
-1")
+      }
+      note("\nDefault input schema will accept: 'userID<tab>itemId' or 
'userID<tab>itemID<tab>any-text...' and all rows will be used")
+      note("\nFile input options:")
+      opt[Unit]('r', "recursive") action { (_, options) =>
+        options.copy(recursive = true)
+      } text ("The input path should be searched recursively for files that 
match the filename pattern from -fp (optional), Default: false")
+      opt[String]("filenamePattern") abbr ("fp") action { (x, options) =>
+        options.copy(filenamePattern = x)
+      } text ("Regex to match in determining input files (optional). Default: 
filename in the --input option or '^part-.*' if --input is a directory")
+      note("\nOutput text file schema options:")
+      opt[String]("outDelim1") abbr ("od1") action { (x, options) =>
+        options.copy(outDelim1 = x)
+      } text ("Primary output inDelim value, used to separate row IDs from the 
similar items list (optional). Default: '\\t'")
+      opt[String]("outDelim2") abbr ("od2") action { (x, options) =>
+        options.copy(outDelim2 = x)
+      } text ("Secondary output inDelim value, used to separate item IDs from 
their values in the similar items list (optional). Default: ':'")
+      opt[String]("outDelim3") abbr ("od3") action { (x, options) =>
+        options.copy(outDelim3 = x)
+      } text ("Last inDelim value, used to separate (itemID:value) tuples in 
the similar items list. (optional). Default: ','")
+      note("\nDefault delimiters will produce output of the form: 
'itemID1<tab>>itemID2:value2,itemID10:value10...'")
+      note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a 
similarity measure.\n")
+      help("help") abbr ("h") text ("prints this usage text\n")
+      checkConfig { c =>
+        if (c.filterPosition == c.itemIDPosition || c.filterPosition == 
c.rowIDPosition || c.rowIDPosition == c.itemIDPosition) failure("The row, item, 
and filter positions must be unique.") else success
+      }
+      checkConfig { c =>
+        if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) 
failure("If using filters they must be unique.") else success
+      }
+    }
+    parser.parse(args, Options()) map { opts =>
+      options = opts
+      process
+    }
+  }
+
+  private def readIndexedDatasets: Array[IndexedDataset] = {
+    val inFiles = FileSysUtils(options.input, options.filenamePattern, 
options.recursive ).uris
+    if(inFiles.isEmpty){Array()}else{
+      val indexedDataset1 = IndexedDataset(readStore1.readFrom(inFiles))
+      if (options.filterPosition != -1 && options.filter2 != null) {
+        val indexedDataset2 = IndexedDataset(readStore2.readFrom(inFiles))
+        Array(indexedDataset1, indexedDataset2)
+      } else {
+        Array(indexedDataset1)
+      }
+    }
+  }
+
+  override def start(masterUrl: String = options.master, appName: String = 
options.appName,
+                      customJars:Traversable[String] = 
Traversable.empty[String]): Unit = {
+    //todo: create and modify a SparkContext here, which can be passed in to 
mahoutSparkContext in the super.start
+    System.setProperty("spark.kryo.referenceTracking", "false")
+    System.setProperty("spark.kryoserializer.buffer.mb", "100")
+    System.setProperty("spark.executor.memory", "2g")
+    super.start(masterUrl, appName, customJars)
+    val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> 
options.filter1,  "rowIDPosition" -> options.rowIDPosition,  "columnIDPosition" 
-> options.itemIDPosition, "filterPosition" -> options.filterPosition)
+    if(options.filterPosition != -1 && options.filter2 != null) {
+      val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> 
options.filter2, "rowIDPosition" -> options.rowIDPosition, "columnIDPosition" 
-> options.itemIDPosition, "filterPosition" -> options.filterPosition)
+      readStore2 = new TextDelimitedIndexedDatasetReader(readSchema2, mc)
+    }
+    val writeSchema = new Schema("delim1" -> options.outDelim1, "delim2" -> 
options.outDelim2, "delim3" -> options.outDelim3)
+    readStore1 = new TextDelimitedIndexedDatasetReader(readSchema1, mc)
+    writeStore = new TextDelimitedIndexedDatasetWriter(writeSchema, mc)
+  }
+
+  override def process: Unit = {
+    start()
+
+    val indexedDatasets = readIndexedDatasets
+
+    val indicatorMatrices = 
CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, 
options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, 
Array(indexedDatasets(1).matrix))
+
+    val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), 
indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs) // self similarity
+
+    writeStore.writeTo(selfIndicatorDataset, options.output+"indicator-matrix")
+    if(indexedDatasets.length > 1){
+      val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), 
indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity
+      writeStore.writeTo(crossIndicatorDataset, 
options.output+"cross-indicator-matrix")
+    }
+
+    stop
+  }
+
+  //Default values go here, any '_' or null should be 'required' in the Parser 
or flags an unused option
+  // todo: support two input streams for cross-similarity, maybe assume one 
schema for both inputs
+  case class Options(
+    master: String = "local",
+    appName: String = "ItemSimilarityJob",
+    randomSeed: Int = System.currentTimeMillis().toInt,
+    recursive: Boolean = false,
+    input: String = null,
+    output: String = null,
+    filenamePattern: String = "^part-.*",
+    maxSimilaritiesPerItem: Int = 100,
+    maxPrefs: Int = 500,
+    minPrefs: Int = 1,
+    rowIDPosition: Int = 0,
+    itemIDPosition: Int = 1,
+    filterPosition: Int = -1,
+    filter1: String = null,
+    filter2: String = null,
+    inDelim: String = ",",
+    outDelim1: String = "\t",
+    outDelim2: String = ":",
+    outDelim3: String = ","
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
new file mode 100644
index 0000000..b8a79f8
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.spark.SparkContext
+import org.apache.mahout.sparkbindings._
+
+/** Extend this class to create a Mahout CLI driver. Minimally you must 
override process and main. Also define a command line parser and default 
options or fill in the following template:
+  * {{{
+  *   object SomeDriver extends MahoutDriver {
+  *     override def main(args: Array[String]): Unit = {
+  *       val parser = new MahoutOptionParser[Options]("Job Name") {
+  *         head("Job Name", "Spark")
+  *         note("Various CLI options")
+  *         //see https://github.com/scopt/scopt for a good Scala option 
parser, which MahoutOptionParser extends
+  *       }
+  *       parser.parse(args, Options()) map { opts =>
+  *         options = opts
+  *         process
+  *       }
+  *     }
+  *
+  *     override def process: Unit = {
+  *       start()
+  *       //don't just stand there do something
+  *       stop
+  *     }
+  *
+  *     //Default values go here, any '_' or null should be 'required' in the 
Parser or flags an unused option
+  *     case class Options(
+  *       appName: String = "Job Name", ...
+  *     )
+  *   }
+  * }}}
+  */
+abstract class MahoutDriver {
+  protected var mc: SparkContext = _
+  /** Creates a Spark context to run the job inside.
+    * Creates a Spark context to run the job inside. Override to set the 
SparkConf values specific to the job, these must be set before the context is 
created.
+    * @param masterUrl Spark master URL
+    * @param appName  Name to display in Spark UI
+    * @param customJars List of paths to custom jars
+    * */
+  protected def start(masterUrl: String, appName: String,
+            customJars:Traversable[String] = Traversable.empty[String]) : Unit 
= {
+    mc = mahoutSparkContext(masterUrl, appName, customJars)
+  }
+
+  /** Override (optionally) for special cleanup */
+  protected def stop: Unit = {
+    mc.stop
+  }
+
+  /** This is wher you do the work, call start first, then before exiting call 
stop */
+  protected def process: Unit
+
+  /** Parse command line and call process*/
+  def main(args: Array[String]): Unit
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
new file mode 100644
index 0000000..8a337f5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.drivers
+
+import scopt.OptionParser
+
+/** Modifies default [[scopt.OptionParser]] to output long help-like usage + 
error message */
+class MahoutOptionParser[C](programName: String) extends 
OptionParser[C](programName: String) {
+  override def showUsageOnError = true
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
new file mode 100644
index 0000000..1179eef
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.mahout.sparkbindings.drm._
+import org.apache.spark.SparkContext._
+import org.apache.mahout.math.RandomAccessSparseVector
+import org.apache.spark.SparkContext
+import com.google.common.collect.{BiMap, HashBiMap}
+import scala.collection.JavaConversions._
+import org.apache.mahout.sparkbindings.DrmRdd
+import org.apache.mahout.math.drm.DrmLike
+
+
+/** Reader trait is abstract in the sense that the reader function must be 
defined by an extending trait, which also defines the type to be read.
+  * @tparam T type of object read, usually supplied by an extending trait.
+  * @todo the reader need not create both dictionaries but does at present. 
There are cases where one or the other dictionary is never used so saving the 
memory for a very large dictionary may be worth the optimization to specify 
which dictionaries are created.
+  */
+trait Reader[T]{
+  val sc: SparkContext
+  val readSchema: Schema
+  protected def reader(sc: SparkContext, readSchema: Schema, source: String): T
+  def readFrom(source: String): T = reader(sc, readSchema, source)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be 
supplied by an extending trait, which also defines the type to be written.
+  * @tparam T
+  */
+trait Writer[T]{
+  val sc: SparkContext
+  val writeSchema: Schema
+  protected def writer(sc: SparkContext, writeSchema: Schema, dest: String, 
collection: T): Unit
+  def writeTo(collection: T, dest: String) = writer(sc, writeSchema, dest, 
collection)
+}
+
+/** Extends Reader trait to supply the 
[[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader 
function for reading text delimited files as described in the 
[[org.apache.mahout.drivers.Schema]]
+  */
+trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+  /** Read in text delimited tuples from all URIs in this comma delimited 
source String. 
+    * 
+    * @param sc context for the Spark job
+    * @param readSchema describes the delimiters and positions of values in 
the text delimited file.
+    * @param source comma delimited URIs of text files to be read into the 
[[org.apache.mahout.drivers.IndexedDataset]]
+    * @return
+    */
+  protected def reader(sc: SparkContext, readSchema: Schema, source: String): 
IndexedDataset = {
+    try {
+      val delimiter = readSchema("delim").asInstanceOf[String]
+      val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
+      val columnIDPosition = readSchema("columnIDPosition").asInstanceOf[Int]
+      val filterPosition = readSchema("filterPosition").asInstanceOf[Int]
+      val filterBy = readSchema("filter").asInstanceOf[String]
+      //instance vars must be put into locally scoped vals when used in 
closures that are
+      //executed but Spark
+
+      assert(!source.isEmpty, {
+        println(this.getClass.toString + ": has no files to read")
+        throw new IllegalArgumentException
+      })
+
+      var columns = sc.textFile(source).map({ line => line.split(delimiter)})
+
+      columns = columns.filter({ tokens => tokens(filterPosition) == filterBy})
+
+      val interactions = columns.map({ tokens => tokens(rowIDPosition) -> 
tokens(columnIDPosition)})
+
+      interactions.cache()
+
+      val rowIDs = interactions.map({ case (rowID, _) => 
rowID}).distinct().collect()
+      val columnIDs = interactions.map({ case (_, columnID) => 
columnID}).distinct().collect()
+
+      val numRows = rowIDs.size
+      val numColumns = columnIDs.size
+
+      val rowIDDictionary = asOrderedDictionary(rowIDs)
+      val columnIDDictionary = asOrderedDictionary(columnIDs)
+
+      val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary)
+      val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary)
+
+      val indexedInteractions =
+        interactions.map({ case (rowID, columnID) =>
+          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+          val columnIndex = columnIDDictionary_bcast.value.get(columnID).get
+
+          rowIndex -> columnIndex
+        }).groupByKey().map({ case (rowIndex, columnIndexes) =>
+          val row = new RandomAccessSparseVector(numColumns)
+          for (columnIndex <- columnIndexes) {
+            row.setQuick(columnIndex, 1.0)
+          }
+          rowIndex -> row
+        }).asInstanceOf[DrmRdd[Int]]
+
+      //todo: old API, val drmInteractions = new 
CheckpointedDrmBase[Int](indexedInteractions, numRows, numColumns)
+      val drmInteractions = new CheckpointedDrmSpark[Int](indexedInteractions, 
numRows, numColumns)
+
+      IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary)
+
+    } catch {
+      case cce: ClassCastException => {
+        println(this.getClass.toString + ": Schema has illegal values"); throw 
cce
+      }
+    }
+  }
+
+  private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] 
= {
+    var dictionary: BiMap[String, Int] = HashBiMap.create()
+    var index = 0
+    for (entry <- entries) {
+      dictionary.forcePut(entry, index)
+      index += 1
+    }
+    dictionary
+  }
+}
+
+trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{
+  /** Read in text delimited tuples from all URIs in this comma delimited 
source String. 
+    *
+    * @param sc context for the Spark job
+    * @param writeSchema describes the delimiters and positions of values in 
the output text delimited file.
+    * @param dest directory to write text delimited version of 
[[org.apache.mahout.drivers.IndexedDataset]]
+    */
+  protected def writer(sc: SparkContext, writeSchema: Schema, dest: String, 
indexedDataset: IndexedDataset): Unit = {
+    try {
+      val outDelim1 = writeSchema("delim1").asInstanceOf[String]
+      val outDelim2 = writeSchema("delim2").asInstanceOf[String]
+      val outDelim3 = writeSchema("delim3").asInstanceOf[String]
+      //instance vars must be put into locally scoped vals when put into 
closures that are
+      //executed but Spark
+      assert (indexedDataset != null, {println(this.getClass.toString+": has 
no indexedDataset to write"); throw new IllegalArgumentException })
+      assert (!dest.isEmpty, {println(this.getClass.toString+": has no 
destination or indextedDataset to write"); throw new IllegalArgumentException})
+      val matrix: DrmLike[Int] = indexedDataset.matrix
+      val rowIDDictionary: BiMap[String, Int] = indexedDataset.rowIDs
+      val columnIDDictionary: BiMap[String, Int] = indexedDataset.columnIDs
+      matrix.rdd.map({ case (rowID, itemVector) =>
+        var line: String = rowIDDictionary.inverse.get(rowID) + outDelim1
+        for (item <- itemVector.nonZeroes()) {
+          line += columnIDDictionary.inverse.get(item.index) + outDelim2 + 
item.get + outDelim3
+        }
+        line.dropRight(1)
+      })
+        .saveAsTextFile(dest)
+    }catch{
+      case cce: ClassCastException => {println(this.getClass.toString+": 
Schema has illegal values"); throw cce}
+    }
+  }
+}
+
+/** A combined trait that reads and writes */
+trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with 
TDIndexedDatasetWriter
+
+/** Reads text delimited files into an IndexedDataset. Classes are needed to 
supply trait params in their constructor.
+  * @param readSchema describes the delimiters and position of values in the 
text delimited file to be read.
+  * @param sc Spark context for reading files
+  * @note The source files are supplied to the readFrom trait method.
+  * */
+class TextDelimitedIndexedDatasetReader(val readSchema: Schema, val sc: 
SparkContext) extends TDIndexedDatasetReader
+
+/** Writes  text delimited files into an IndexedDataset. Classes are needed to 
supply trait params in their constructor.
+  * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
+  * @param sc Spark context for reading files
+  * @note the destination if supplied to the writeTo trait method
+  * */
+class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sc: 
SparkContext) extends TDIndexedDatasetWriter
+
+/** Reads and writes text delimited files to/from an IndexedDataset. Classes 
are needed to supply trait params in their constructor.
+  * @param readSchema describes the delimiters and position of values in the 
text delimited file(s) to be read.
+  * @param writeSchema describes the delimiters and position of values in the 
text delimited file(s) written.
+  * @param sc Spark context for reading the files
+  * */
+class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val 
writeSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetReaderWriter

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
new file mode 100644
index 0000000..13fa292
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala
@@ -0,0 +1,30 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import scala.collection.mutable.HashMap
+
+/** Syntactic sugar for HashMap[String, Any]
+  *
+  * @param params list of mappings for instantiation {{{val mySchema = new 
Schema("one" -> 1, "two" -> "2", ...)}}}
+  */
+class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] {
+  //todo: this require a mutable HashMap, do we care?
+  this ++= params
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala 
b/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala
new file mode 100644
index 0000000..fb13361
--- /dev/null
+++ b/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala
@@ -0,0 +1,25 @@
+package org.apache.mahout.drivers
+
+import org.scalatest.FunSuite
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class IndexedDatasetTest extends FunSuite {
+  //todo: put some tests here!
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala
 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala
new file mode 100644
index 0000000..c1ae4ab
--- /dev/null
+++ 
b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala
@@ -0,0 +1,180 @@
+package org.apache.mahout.drivers
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.mahout.sparkbindings._
+import java.io.{FileWriter, BufferedWriter}
+import com.google.common.io.Closeables
+import org.apache.spark.SparkContext
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class ItemSimilarityDriver$Test extends FunSuite {
+  private var sc: SparkContext = _
+
+
+  test ("Running some text delimited files through the 
import/cooccurrence/export"){
+
+    val exampleCsvlogStatements = Array(
+      "12569537329,user1,item1,\"view\"",
+      "12569537329,user1,item2,\"view\"",
+      "12569537329,user1,item2,\"like\"",
+      "12569537329,user1,item3,\"like\"",
+      "12569537329,user2,item2,\"view\"",
+      "12569537329,user2,item2,\"like\"",
+      "12569537329,user3,item1,\"like\"",
+      "12569537329,user3,item3,\"view\"",
+      "12569537329,user3,item3,\"like\""
+    )
+
+    val exampleTsvLogStatements = Array(
+      "12569537329\tuser1\titem1\t\"view\"",
+      "12569537329\tuser1\titem2\t\"view\"",
+      "12569537329\tuser1\titem2\t\"like\"",
+      "12569537329\tuser1\titem3\t\"like\"",
+      "12569537329\tuser2\titem2\t\"view\"",
+      "12569537329\tuser2\titem2\t\"like\"",
+      "12569537329\tuser3\titem1\t\"like\"",
+      "12569537329\tuser3\titem3\t\"view\"",
+      "12569537329\tuser3\titem3\t\"like\""
+    )
+
+
+    val csvLogStatements = Array(
+      "u1,purchase,iphone",
+      "u1,purchase,ipad",
+      "u2,purchase,nexus",
+      "u2,purchase,galaxy",
+      "u3,purchase,surface",
+      "u4,purchase,iphone",
+      "u4,purchase,galaxy",
+      "u1,view,iphone",
+      "u1,view,ipad",
+      "u1,view,nexus",
+      "u1,view,galaxy",
+      "u2,view,iphone",
+      "u2,view,ipad",
+      "u2,view,nexus",
+      "u2,view,galaxy",
+      "u3,view,surface",
+      "u3,view,nexus",
+      "u4,view,iphone",
+      "u4,view,ipad",
+      "u4,view,galaxy"
+    )
+
+    val tsvLogStatements = Array(
+      "u1\tpurchase\tiphone",
+      "u1\tpurchase\tipad",
+      "u2\tpurchase\tnexus",
+      "u2\tpurchase\tgalaxy",
+      "u3\tpurchase\tsurface",
+      "u4\tpurchase\tiphone",
+      "u4\tpurchase\tgalaxy",
+      "u1\tview\tiphone",
+      "u1\tview\tipad",
+      "u1\tview\tnexus",
+      "u1\tview\tgalaxy",
+      "u2\tview\tiphone",
+      "u2\tview\tipad",
+      "u2\tview\tnexus",
+      "u2\tview\tgalaxy",
+      "u3\tview\tsurface",
+      "u3\tview\tnexus",
+      "u4\tview\tiphone",
+      "u4\tview\tipad",
+      "u4\tview\tgalaxy"
+    )
+
+    var w: BufferedWriter = null
+    //try {
+      w = new BufferedWriter(new FileWriter("/tmp/cf-data.txt"))
+      w.write(csvLogStatements.mkString("\n"))
+    //} finally {
+      Closeables.close(w, false)
+    //}
+    /*
+        val indexedLikes = IndexedDatasetStore.readTuples(sc, 
"tmp/cf-data.txt", 2, ",", 0, 2, 1, "purchase")
+
+        val indexedViews = IndexedDatasetStore.readTuples(sc, 
"/tmp/cf-data.txt", 2, ",", 0, 2, 1, "view")
+
+        val drmLikes = indexedLikes.matrix
+        val drmViews = indexedViews.matrix
+
+        // Now we could run cooccurrence analysis using the DRMs, instead 
we'll just fetch and print the matrices
+        val drmXCooccurrences = cooccurrences(drmLikes, randomSeed = 
0xdeadbeef,
+          maxInterestingItemsPerThing = 100, maxNumInteractions = 500, 
Array(drmViews))
+
+        val inCoreViews = drmViews.collect
+        val inCoreLikes = drmLikes.collect
+        val inCoreIndicator = drmXCooccurrences(0).collect
+        val inCoreXIndicator = drmXCooccurrences(1).collect
+        println("\nLIKES:")
+        println(inCoreLikes)
+        println("\nVIEWS:")
+        println(inCoreViews)
+        println("\nINDICATOR MATRIX")
+        println(inCoreIndicator)
+        println("\nCROSS INDICATOR MATRIX")
+        println(inCoreXIndicator)
+    */
+
+    /*
+        //Clustered Spark and HDFS
+        ItemSimilarityDriver.main(Array(
+          "--input", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt",
+          "--output", 
"hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/",
+          "--master", "spark://occam4:7077",
+          "--filter1", "purchase",
+          "--filter2", "view",
+          "--inDelim", ",",
+          "--itemIDPosition", "2",
+          "--rowIDPosition", "0",
+          "--filterPosition", "1"
+        ))
+    */
+    //local multi-threaded Spark with HDFS using large dataset
+    /*    ItemSimilarityDriver.main(Array(
+        "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt",
+        "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/",
+        "--master", "local[4]",
+        "--filter1", "purchase",
+        "--filter2", "view",
+        "--inDelim", ",",
+        "--itemIDPosition", "2",
+        "--rowIDPosition", "0",
+        "--filterPosition", "1"
+      ))
+    */
+
+    //local multi-threaded Spark with local FS
+    ItemSimilarityDriver.main(Array(
+      "--input", "/tmp/cf-data.txt",
+      "--output", "tmp/indicatorMatrices/",
+      "--master", "local[4]",
+      "--filter1", "purchase",
+      "--filter2", "view",
+      "--inDelim", ",",
+      "--itemIDPosition", "2",
+      "--rowIDPosition", "0",
+      "--filterPosition", "1"
+    ))
+
+  }
+
+}

Reply via email to