Adding stuff for itemsimilarity driver for Spark
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/74b9921c Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/74b9921c Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/74b9921c Branch: refs/heads/mahout-1541 Commit: 74b9921c4c9bd8903585bbd74d9e66298ea8b7a0 Parents: 107a0ba Author: pferrel <[email protected]> Authored: Wed Jun 4 13:09:07 2014 -0700 Committer: pferrel <[email protected]> Committed: Wed Jun 4 13:09:07 2014 -0700 ---------------------------------------------------------------------- spark/src/.DS_Store | Bin 6148 -> 0 bytes .../apache/mahout/drivers/FileSysUtils.scala | 34 +++ .../apache/mahout/drivers/IndexedDataset.scala | 54 +++++ .../drivers/IndexedDatasetDriverTest.scala | 164 +++++++++++++++ .../mahout/drivers/ItemSimilarityDriver.scala | 208 +++++++++++++++++++ .../apache/mahout/drivers/MahoutDriver.scala | 74 +++++++ .../mahout/drivers/MahoutOptionParser.scala | 24 +++ .../apache/mahout/drivers/ReaderWriter.scala | 188 +++++++++++++++++ .../org/apache/mahout/drivers/Schema.scala | 30 +++ .../mahout/drivers/IndexedDatasetTest.scala | 25 +++ .../drivers/ItemSimilarityDriver$Test.scala | 180 ++++++++++++++++ 11 files changed, 981 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/.DS_Store ---------------------------------------------------------------------- diff --git a/spark/src/.DS_Store b/spark/src/.DS_Store deleted file mode 100644 index 7b0d367..0000000 Binary files a/spark/src/.DS_Store and /dev/null differ http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala new file mode 100644 index 0000000..e8491f2 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +/** + * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor. + * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]] + * + * @param pathURI Where to start looking for inFiles, only HDFS and local URI are currently + * supported + * @param filePattern regex that must match the entire filename to have the file included in the returned list + * @param recursive true traverses the filesystem recursively + */ + +case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) { + // todo: There is an HDFS filestatus method that collects multiple inFiles, see if this is the right thing to use + // todo: check to see if the input is a supported URI for collection or recursive search but just pass through otherwise + def uris = {pathURI} +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala new file mode 100644 index 0000000..36bda90 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import com.google.common.collect.BiMap +import org.apache.mahout.math.drm.DrmLike + +/** + * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries. + * The purpose of this class is to wrap a DrmLike[C] with bidirectional ID mappings so + * a user specified label or ID can be stored and mapped to and from the [[scala.Int]] ordinal ID + * used internal to Mahout Core code. + * + * Example: For a transpose job the [[org.apache.mahout.drivers.IndexedDataset#matrix]]: [[org.apache.mahout.sparkbindings.drm.DrmLike]] is passed into the DSL code + * that transposes the values, then a resulting [[org.apache.mahout.drivers.IndexedDataset]] is created from the transposed DrmLike object with swapped dictionaries (since the rows and columns are transposed). The new + * [[org.apache.mahout.drivers.IndexedDataset]] is returned. + * + * @param matrix DrmLike[Int], representing the distributed matrix storing the actual data. + * @param rowIDs BiMap[String, Int] storing a bidirectional mapping of external String ID to + * and from the ordinal Mahout Int ID. This one holds row labels + * @param columnIDs BiMap[String, Int] storing a bidirectional mapping of external String + * ID to and from the ordinal Mahout Int ID. This one holds column labels + */ + +case class IndexedDataset(matrix: DrmLike[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) { +} + +/** + * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for + * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]] + * {{{ + * val indexedDataset = IndexedDataset(indexedDatasetStore.read) + * }}} + */ + +object IndexedDataset { + /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */ + def apply(id2: IndexedDataset) = new IndexedDataset(id2.matrix, id2.rowIDs, id2.columnIDs) +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala new file mode 100644 index 0000000..80fc134 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDatasetDriverTest.scala @@ -0,0 +1,164 @@ +package org.apache.mahout.drivers + +import org.apache.mahout.sparkbindings._ +import java.io.{FileWriter, BufferedWriter} +import com.google.common.io.Closeables + +object ItemSimilarityDriverTest { + + def main(args: Array[String]): Unit = { + + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + + implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext", + customJars = Traversable.empty[String]) + + val exampleCsvlogStatements = Array( + "12569537329,user1,item1,\"view\"", + "12569537329,user1,item2,\"view\"", + "12569537329,user1,item2,\"like\"", + "12569537329,user1,item3,\"like\"", + "12569537329,user2,item2,\"view\"", + "12569537329,user2,item2,\"like\"", + "12569537329,user3,item1,\"like\"", + "12569537329,user3,item3,\"view\"", + "12569537329,user3,item3,\"like\"" + ) + + val exampleTsvLogStatements = Array( + "12569537329\tuser1\titem1\t\"view\"", + "12569537329\tuser1\titem2\t\"view\"", + "12569537329\tuser1\titem2\t\"like\"", + "12569537329\tuser1\titem3\t\"like\"", + "12569537329\tuser2\titem2\t\"view\"", + "12569537329\tuser2\titem2\t\"like\"", + "12569537329\tuser3\titem1\t\"like\"", + "12569537329\tuser3\titem3\t\"view\"", + "12569537329\tuser3\titem3\t\"like\"" + ) + + + val csvLogStatements = Array( + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,iphone", + "u1,view,ipad", + "u1,view,nexus", + "u1,view,galaxy", + "u2,view,iphone", + "u2,view,ipad", + "u2,view,nexus", + "u2,view,galaxy", + "u3,view,surface", + "u3,view,nexus", + "u4,view,iphone", + "u4,view,ipad", + "u4,view,galaxy" + ) + + val tsvLogStatements = Array( + "u1\tpurchase\tiphone", + "u1\tpurchase\tipad", + "u2\tpurchase\tnexus", + "u2\tpurchase\tgalaxy", + "u3\tpurchase\tsurface", + "u4\tpurchase\tiphone", + "u4\tpurchase\tgalaxy", + "u1\tview\tiphone", + "u1\tview\tipad", + "u1\tview\tnexus", + "u1\tview\tgalaxy", + "u2\tview\tiphone", + "u2\tview\tipad", + "u2\tview\tnexus", + "u2\tview\tgalaxy", + "u3\tview\tsurface", + "u3\tview\tnexus", + "u4\tview\tiphone", + "u4\tview\tipad", + "u4\tview\tgalaxy" + ) + + var w: BufferedWriter = null + try { + w = new BufferedWriter(new FileWriter("tmp/cf-data.txt")) + w.write(csvLogStatements.mkString("\n")) + } finally { + Closeables.close(w, false) + } + /* + val indexedLikes = IndexedDatasetStore.readTuples(sc, "tmp/cf-data.txt", 2, ",", 0, 2, 1, "purchase") + + val indexedViews = IndexedDatasetStore.readTuples(sc, "tmp/cf-data.txt", 2, ",", 0, 2, 1, "view") + + val drmLikes = indexedLikes.matrix + val drmViews = indexedViews.matrix + + // Now we could run cooccurrence analysis using the DRMs, instead we'll just fetch and print the matrices + val drmXCooccurrences = cooccurrences(drmLikes, randomSeed = 0xdeadbeef, + maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmViews)) + + val inCoreViews = drmViews.collect + val inCoreLikes = drmLikes.collect + val inCoreIndicator = drmXCooccurrences(0).collect + val inCoreXIndicator = drmXCooccurrences(1).collect + println("\nLIKES:") + println(inCoreLikes) + println("\nVIEWS:") + println(inCoreViews) + println("\nINDICATOR MATRIX") + println(inCoreIndicator) + println("\nCROSS INDICATOR MATRIX") + println(inCoreXIndicator) + */ + sc.stop() +/* + //Clustered Spark and HDFS + ItemSimilarityDriver.main(Array( + "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt", + "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/", + "--master", "spark://occam4:7077", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) +*/ + //local multi-threaded Spark with HDFS +/* ItemSimilarityDriver.main(Array( + "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt", + "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/", + "--master", "local[4]", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) +*/ + + //local multi-threaded Spark with local FS + ItemSimilarityDriver.main(Array( + "--input", "tmp/cf-data.txt", + "--output", "tmp/indicatorMatrices/", + "--master", "local[4]", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + + + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala new file mode 100644 index 0000000..c72a3b3 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -0,0 +1,208 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package org.apache.mahout.drivers + +import org.apache.mahout.cf.CooccurrenceAnalysis + +/** + * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences()]]. + * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences()]]. It text lines + * that contain (row id, column id, ...). The IDs are user specified strings which will be preserved in the + * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences()]] + * will be used to calculate row-wise self-similarity, or when using filters, will generate two + * matrices and calculate both the self similarity of the primary matrix and the row-wise similarity of the primary + * to the secondary. Returns one or two directories of text files formatted as specified in the options. + * @todo Should also take two input streams and do cross similarity with not filter required. + */ +object ItemSimilarityDriver extends MahoutDriver { + + private var options: Options = _ + private var readStore1: TextDelimitedIndexedDatasetReader = _ + private var readStore2: TextDelimitedIndexedDatasetReader = _ + private var writeStore: TextDelimitedIndexedDatasetWriter = _ + +/** + * @param args Command line args, if empty a help message is printed. + */ + override def main(args: Array[String]): Unit = { + val parser = new MahoutOptionParser[Options]("ItemSimilarity") { + head("ItemSimilarity", "Spark") + note("Input, output options") + opt[String]('i', "input") required() action { (x, options) => + options.copy(input = x) + } text ("Path for input. It may be a filename or directory name and can be a local file path or an HDFS URI (required).") + opt[String]('o', "output") required() action { (x, options) => + options.copy(output = x) + } text ("Output will be in sub-directories stored here so this must be a directory path (required).") + note("\nAlgorithm control options:") + opt[String]("master") abbr ("ma") text ("URL for the Spark Master. (optional). Default: 'local'") action { (x, options) => + options.copy(master = x) + } + opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) => + options.copy(maxPrefs = x) + } text ("Max number of preferences to consider per user or item, users or items with more preferences will be sampled down (optional). Default: 500") validate { x => + if (x > 0) success else failure("Option --maxPrefs must be > 0") + } + opt[Int]("minPrefs") abbr ("mp") action { (x, options) => + options.copy(minPrefs = x) + } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x => + if (x > 0) success else failure("Option --minPrefs must be > 0") + } + + opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) => + options.copy(maxSimilaritiesPerItem = x) + } text ("Try to cap the number of similar items for each item to this number (optional). Default: 100") validate { x => + if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0") + } + opt[Int]("randomSeed") abbr ("rs") action { (x, options) => + options.copy(randomSeed = x) + } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x => + if (x > 0) success else failure("Option --randomSeed must be > 0") + } + note("\nInput text file schema options:") + opt[String]("inDelim") abbr ("d") text ("Input delimiter character (optional). Default: '\\t'") action { (x, options) => + options.copy(inDelim = x) + } + opt[String]("filter1") abbr ("f1") action { (x, options) => + options.copy(filter1 = x) + } text ("String whose presence indicates a datum for the primary item set, can be a regex (optional). Default: no filtered is applied, all is used") + opt[String]("filter2") abbr ("f2") action { (x, options) => + options.copy(filter2 = x) + } text ("String whose presence indicates a datum for the secondary item set, can be a regex (optional). Used in cross-cooccurrence. Default: no secondary filter is applied") + opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) => + options.copy(rowIDPosition = x) + } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => + if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") + } + opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) => + options.copy(itemIDPosition = x) + } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => + if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") + } + opt[Int]("filterPosition") abbr ("fc") action { (x, options) => + options.copy(filterPosition = x) + } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => + if (x >= -1) success else failure("Option --filterColNum must be >= -1") + } + note("\nDefault input schema will accept: 'userID<tab>itemId' or 'userID<tab>itemID<tab>any-text...' and all rows will be used") + note("\nFile input options:") + opt[Unit]('r', "recursive") action { (_, options) => + options.copy(recursive = true) + } text ("The input path should be searched recursively for files that match the filename pattern from -fp (optional), Default: false") + opt[String]("filenamePattern") abbr ("fp") action { (x, options) => + options.copy(filenamePattern = x) + } text ("Regex to match in determining input files (optional). Default: filename in the --input option or '^part-.*' if --input is a directory") + note("\nOutput text file schema options:") + opt[String]("outDelim1") abbr ("od1") action { (x, options) => + options.copy(outDelim1 = x) + } text ("Primary output inDelim value, used to separate row IDs from the similar items list (optional). Default: '\\t'") + opt[String]("outDelim2") abbr ("od2") action { (x, options) => + options.copy(outDelim2 = x) + } text ("Secondary output inDelim value, used to separate item IDs from their values in the similar items list (optional). Default: ':'") + opt[String]("outDelim3") abbr ("od3") action { (x, options) => + options.copy(outDelim3 = x) + } text ("Last inDelim value, used to separate (itemID:value) tuples in the similar items list. (optional). Default: ','") + note("\nDefault delimiters will produce output of the form: 'itemID1<tab>>itemID2:value2,itemID10:value10...'") + note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n") + help("help") abbr ("h") text ("prints this usage text\n") + checkConfig { c => + if (c.filterPosition == c.itemIDPosition || c.filterPosition == c.rowIDPosition || c.rowIDPosition == c.itemIDPosition) failure("The row, item, and filter positions must be unique.") else success + } + checkConfig { c => + if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If using filters they must be unique.") else success + } + } + parser.parse(args, Options()) map { opts => + options = opts + process + } + } + + private def readIndexedDatasets: Array[IndexedDataset] = { + val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive ).uris + if(inFiles.isEmpty){Array()}else{ + val indexedDataset1 = IndexedDataset(readStore1.readFrom(inFiles)) + if (options.filterPosition != -1 && options.filter2 != null) { + val indexedDataset2 = IndexedDataset(readStore2.readFrom(inFiles)) + Array(indexedDataset1, indexedDataset2) + } else { + Array(indexedDataset1) + } + } + } + + override def start(masterUrl: String = options.master, appName: String = options.appName, + customJars:Traversable[String] = Traversable.empty[String]): Unit = { + //todo: create and modify a SparkContext here, which can be passed in to mahoutSparkContext in the super.start + System.setProperty("spark.kryo.referenceTracking", "false") + System.setProperty("spark.kryoserializer.buffer.mb", "100") + System.setProperty("spark.executor.memory", "2g") + super.start(masterUrl, appName, customJars) + val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1, "rowIDPosition" -> options.rowIDPosition, "columnIDPosition" -> options.itemIDPosition, "filterPosition" -> options.filterPosition) + if(options.filterPosition != -1 && options.filter2 != null) { + val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2, "rowIDPosition" -> options.rowIDPosition, "columnIDPosition" -> options.itemIDPosition, "filterPosition" -> options.filterPosition) + readStore2 = new TextDelimitedIndexedDatasetReader(readSchema2, mc) + } + val writeSchema = new Schema("delim1" -> options.outDelim1, "delim2" -> options.outDelim2, "delim3" -> options.outDelim3) + readStore1 = new TextDelimitedIndexedDatasetReader(readSchema1, mc) + writeStore = new TextDelimitedIndexedDatasetWriter(writeSchema, mc) + } + + override def process: Unit = { + start() + + val indexedDatasets = readIndexedDatasets + + val indicatorMatrices = CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix)) + + val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs) // self similarity + + writeStore.writeTo(selfIndicatorDataset, options.output+"indicator-matrix") + if(indexedDatasets.length > 1){ + val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity + writeStore.writeTo(crossIndicatorDataset, options.output+"cross-indicator-matrix") + } + + stop + } + + //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option + // todo: support two input streams for cross-similarity, maybe assume one schema for both inputs + case class Options( + master: String = "local", + appName: String = "ItemSimilarityJob", + randomSeed: Int = System.currentTimeMillis().toInt, + recursive: Boolean = false, + input: String = null, + output: String = null, + filenamePattern: String = "^part-.*", + maxSimilaritiesPerItem: Int = 100, + maxPrefs: Int = 500, + minPrefs: Int = 1, + rowIDPosition: Int = 0, + itemIDPosition: Int = 1, + filterPosition: Int = -1, + filter1: String = null, + filter2: String = null, + inDelim: String = ",", + outDelim1: String = "\t", + outDelim2: String = ":", + outDelim3: String = "," + ) + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala new file mode 100644 index 0000000..b8a79f8 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.spark.SparkContext +import org.apache.mahout.sparkbindings._ + +/** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. Also define a command line parser and default options or fill in the following template: + * {{{ + * object SomeDriver extends MahoutDriver { + * override def main(args: Array[String]): Unit = { + * val parser = new MahoutOptionParser[Options]("Job Name") { + * head("Job Name", "Spark") + * note("Various CLI options") + * //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends + * } + * parser.parse(args, Options()) map { opts => + * options = opts + * process + * } + * } + * + * override def process: Unit = { + * start() + * //don't just stand there do something + * stop + * } + * + * //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option + * case class Options( + * appName: String = "Job Name", ... + * ) + * } + * }}} + */ +abstract class MahoutDriver { + protected var mc: SparkContext = _ + /** Creates a Spark context to run the job inside. + * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job, these must be set before the context is created. + * @param masterUrl Spark master URL + * @param appName Name to display in Spark UI + * @param customJars List of paths to custom jars + * */ + protected def start(masterUrl: String, appName: String, + customJars:Traversable[String] = Traversable.empty[String]) : Unit = { + mc = mahoutSparkContext(masterUrl, appName, customJars) + } + + /** Override (optionally) for special cleanup */ + protected def stop: Unit = { + mc.stop + } + + /** This is wher you do the work, call start first, then before exiting call stop */ + protected def process: Unit + + /** Parse command line and call process*/ + def main(args: Array[String]): Unit +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala new file mode 100644 index 0000000..8a337f5 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.drivers + +import scopt.OptionParser + +/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */ +class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) { + override def showUsageOnError = true +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala new file mode 100644 index 0000000..1179eef --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import org.apache.mahout.sparkbindings.drm._ +import org.apache.spark.SparkContext._ +import org.apache.mahout.math.RandomAccessSparseVector +import org.apache.spark.SparkContext +import com.google.common.collect.{BiMap, HashBiMap} +import scala.collection.JavaConversions._ +import org.apache.mahout.sparkbindings.DrmRdd +import org.apache.mahout.math.drm.DrmLike + + +/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read. + * @tparam T type of object read, usually supplied by an extending trait. + * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created. + */ +trait Reader[T]{ + val sc: SparkContext + val readSchema: Schema + protected def reader(sc: SparkContext, readSchema: Schema, source: String): T + def readFrom(source: String): T = reader(sc, readSchema, source) +} + +/** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written. + * @tparam T + */ +trait Writer[T]{ + val sc: SparkContext + val writeSchema: Schema + protected def writer(sc: SparkContext, writeSchema: Schema, dest: String, collection: T): Unit + def writeTo(collection: T, dest: String) = writer(sc, writeSchema, dest, collection) +} + +/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]] + */ +trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ + /** Read in text delimited tuples from all URIs in this comma delimited source String. + * + * @param sc context for the Spark job + * @param readSchema describes the delimiters and positions of values in the text delimited file. + * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] + * @return + */ + protected def reader(sc: SparkContext, readSchema: Schema, source: String): IndexedDataset = { + try { + val delimiter = readSchema("delim").asInstanceOf[String] + val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int] + val columnIDPosition = readSchema("columnIDPosition").asInstanceOf[Int] + val filterPosition = readSchema("filterPosition").asInstanceOf[Int] + val filterBy = readSchema("filter").asInstanceOf[String] + //instance vars must be put into locally scoped vals when used in closures that are + //executed but Spark + + assert(!source.isEmpty, { + println(this.getClass.toString + ": has no files to read") + throw new IllegalArgumentException + }) + + var columns = sc.textFile(source).map({ line => line.split(delimiter)}) + + columns = columns.filter({ tokens => tokens(filterPosition) == filterBy}) + + val interactions = columns.map({ tokens => tokens(rowIDPosition) -> tokens(columnIDPosition)}) + + interactions.cache() + + val rowIDs = interactions.map({ case (rowID, _) => rowID}).distinct().collect() + val columnIDs = interactions.map({ case (_, columnID) => columnID}).distinct().collect() + + val numRows = rowIDs.size + val numColumns = columnIDs.size + + val rowIDDictionary = asOrderedDictionary(rowIDs) + val columnIDDictionary = asOrderedDictionary(columnIDs) + + val rowIDDictionary_bcast = sc.broadcast(rowIDDictionary) + val columnIDDictionary_bcast = sc.broadcast(columnIDDictionary) + + val indexedInteractions = + interactions.map({ case (rowID, columnID) => + val rowIndex = rowIDDictionary_bcast.value.get(rowID).get + val columnIndex = columnIDDictionary_bcast.value.get(columnID).get + + rowIndex -> columnIndex + }).groupByKey().map({ case (rowIndex, columnIndexes) => + val row = new RandomAccessSparseVector(numColumns) + for (columnIndex <- columnIndexes) { + row.setQuick(columnIndex, 1.0) + } + rowIndex -> row + }).asInstanceOf[DrmRdd[Int]] + + //todo: old API, val drmInteractions = new CheckpointedDrmBase[Int](indexedInteractions, numRows, numColumns) + val drmInteractions = new CheckpointedDrmSpark[Int](indexedInteractions, numRows, numColumns) + + IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary) + + } catch { + case cce: ClassCastException => { + println(this.getClass.toString + ": Schema has illegal values"); throw cce + } + } + } + + private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = { + var dictionary: BiMap[String, Int] = HashBiMap.create() + var index = 0 + for (entry <- entries) { + dictionary.forcePut(entry, index) + index += 1 + } + dictionary + } +} + +trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ + /** Read in text delimited tuples from all URIs in this comma delimited source String. + * + * @param sc context for the Spark job + * @param writeSchema describes the delimiters and positions of values in the output text delimited file. + * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]] + */ + protected def writer(sc: SparkContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = { + try { + val outDelim1 = writeSchema("delim1").asInstanceOf[String] + val outDelim2 = writeSchema("delim2").asInstanceOf[String] + val outDelim3 = writeSchema("delim3").asInstanceOf[String] + //instance vars must be put into locally scoped vals when put into closures that are + //executed but Spark + assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException }) + assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException}) + val matrix: DrmLike[Int] = indexedDataset.matrix + val rowIDDictionary: BiMap[String, Int] = indexedDataset.rowIDs + val columnIDDictionary: BiMap[String, Int] = indexedDataset.columnIDs + matrix.rdd.map({ case (rowID, itemVector) => + var line: String = rowIDDictionary.inverse.get(rowID) + outDelim1 + for (item <- itemVector.nonZeroes()) { + line += columnIDDictionary.inverse.get(item.index) + outDelim2 + item.get + outDelim3 + } + line.dropRight(1) + }) + .saveAsTextFile(dest) + }catch{ + case cce: ClassCastException => {println(this.getClass.toString+": Schema has illegal values"); throw cce} + } + } +} + +/** A combined trait that reads and writes */ +trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexedDatasetWriter + +/** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. + * @param readSchema describes the delimiters and position of values in the text delimited file to be read. + * @param sc Spark context for reading files + * @note The source files are supplied to the readFrom trait method. + * */ +class TextDelimitedIndexedDatasetReader(val readSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetReader + +/** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. + * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. + * @param sc Spark context for reading files + * @note the destination if supplied to the writeTo trait method + * */ +class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetWriter + +/** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor. + * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read. + * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. + * @param sc Spark context for reading the files + * */ +class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sc: SparkContext) extends TDIndexedDatasetReaderWriter http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala new file mode 100644 index 0000000..13fa292 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala @@ -0,0 +1,30 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.drivers + +import scala.collection.mutable.HashMap + +/** Syntactic sugar for HashMap[String, Any] + * + * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} + */ +class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { + //todo: this require a mutable HashMap, do we care? + this ++= params +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala b/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala new file mode 100644 index 0000000..fb13361 --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/drivers/IndexedDatasetTest.scala @@ -0,0 +1,25 @@ +package org.apache.mahout.drivers + +import org.scalatest.FunSuite + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class IndexedDatasetTest extends FunSuite { + //todo: put some tests here! + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/74b9921c/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala new file mode 100644 index 0000000..c1ae4ab --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/drivers/ItemSimilarityDriver$Test.scala @@ -0,0 +1,180 @@ +package org.apache.mahout.drivers + +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.mahout.sparkbindings._ +import java.io.{FileWriter, BufferedWriter} +import com.google.common.io.Closeables +import org.apache.spark.SparkContext + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class ItemSimilarityDriver$Test extends FunSuite { + private var sc: SparkContext = _ + + + test ("Running some text delimited files through the import/cooccurrence/export"){ + + val exampleCsvlogStatements = Array( + "12569537329,user1,item1,\"view\"", + "12569537329,user1,item2,\"view\"", + "12569537329,user1,item2,\"like\"", + "12569537329,user1,item3,\"like\"", + "12569537329,user2,item2,\"view\"", + "12569537329,user2,item2,\"like\"", + "12569537329,user3,item1,\"like\"", + "12569537329,user3,item3,\"view\"", + "12569537329,user3,item3,\"like\"" + ) + + val exampleTsvLogStatements = Array( + "12569537329\tuser1\titem1\t\"view\"", + "12569537329\tuser1\titem2\t\"view\"", + "12569537329\tuser1\titem2\t\"like\"", + "12569537329\tuser1\titem3\t\"like\"", + "12569537329\tuser2\titem2\t\"view\"", + "12569537329\tuser2\titem2\t\"like\"", + "12569537329\tuser3\titem1\t\"like\"", + "12569537329\tuser3\titem3\t\"view\"", + "12569537329\tuser3\titem3\t\"like\"" + ) + + + val csvLogStatements = Array( + "u1,purchase,iphone", + "u1,purchase,ipad", + "u2,purchase,nexus", + "u2,purchase,galaxy", + "u3,purchase,surface", + "u4,purchase,iphone", + "u4,purchase,galaxy", + "u1,view,iphone", + "u1,view,ipad", + "u1,view,nexus", + "u1,view,galaxy", + "u2,view,iphone", + "u2,view,ipad", + "u2,view,nexus", + "u2,view,galaxy", + "u3,view,surface", + "u3,view,nexus", + "u4,view,iphone", + "u4,view,ipad", + "u4,view,galaxy" + ) + + val tsvLogStatements = Array( + "u1\tpurchase\tiphone", + "u1\tpurchase\tipad", + "u2\tpurchase\tnexus", + "u2\tpurchase\tgalaxy", + "u3\tpurchase\tsurface", + "u4\tpurchase\tiphone", + "u4\tpurchase\tgalaxy", + "u1\tview\tiphone", + "u1\tview\tipad", + "u1\tview\tnexus", + "u1\tview\tgalaxy", + "u2\tview\tiphone", + "u2\tview\tipad", + "u2\tview\tnexus", + "u2\tview\tgalaxy", + "u3\tview\tsurface", + "u3\tview\tnexus", + "u4\tview\tiphone", + "u4\tview\tipad", + "u4\tview\tgalaxy" + ) + + var w: BufferedWriter = null + //try { + w = new BufferedWriter(new FileWriter("/tmp/cf-data.txt")) + w.write(csvLogStatements.mkString("\n")) + //} finally { + Closeables.close(w, false) + //} + /* + val indexedLikes = IndexedDatasetStore.readTuples(sc, "tmp/cf-data.txt", 2, ",", 0, 2, 1, "purchase") + + val indexedViews = IndexedDatasetStore.readTuples(sc, "/tmp/cf-data.txt", 2, ",", 0, 2, 1, "view") + + val drmLikes = indexedLikes.matrix + val drmViews = indexedViews.matrix + + // Now we could run cooccurrence analysis using the DRMs, instead we'll just fetch and print the matrices + val drmXCooccurrences = cooccurrences(drmLikes, randomSeed = 0xdeadbeef, + maxInterestingItemsPerThing = 100, maxNumInteractions = 500, Array(drmViews)) + + val inCoreViews = drmViews.collect + val inCoreLikes = drmLikes.collect + val inCoreIndicator = drmXCooccurrences(0).collect + val inCoreXIndicator = drmXCooccurrences(1).collect + println("\nLIKES:") + println(inCoreLikes) + println("\nVIEWS:") + println(inCoreViews) + println("\nINDICATOR MATRIX") + println(inCoreIndicator) + println("\nCROSS INDICATOR MATRIX") + println(inCoreXIndicator) + */ + + /* + //Clustered Spark and HDFS + ItemSimilarityDriver.main(Array( + "--input", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/cf-data.txt", + "--output", "hdfs://occam4:54310/user/pat/spark-itemsimilarity/indicatorMatrices/", + "--master", "spark://occam4:7077", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + */ + //local multi-threaded Spark with HDFS using large dataset + /* ItemSimilarityDriver.main(Array( + "--input", "hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt", + "--output", "hdfs://occam4:54310/user/pat/xrsj/indicatorMatrices/", + "--master", "local[4]", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + */ + + //local multi-threaded Spark with local FS + ItemSimilarityDriver.main(Array( + "--input", "/tmp/cf-data.txt", + "--output", "tmp/indicatorMatrices/", + "--master", "local[4]", + "--filter1", "purchase", + "--filter2", "view", + "--inDelim", ",", + "--itemIDPosition", "2", + "--rowIDPosition", "0", + "--filterPosition", "1" + )) + + } + +}
