MAHOUT-1541, MAHOUT-1568, MAHOUT-1569 refactoring the options parser and option defaults to DRY up individual driver code putting more in base classes, tightened up the test suite with a better way of comparing actual with correct
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a8097403 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a8097403 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a8097403 Branch: refs/heads/spark-1.0.x Commit: a80974037853c5227f9e5ef1c384a1fca134746e Parents: 00c0149 Author: pferrel <[email protected]> Authored: Wed Aug 6 16:28:37 2014 -0700 Committer: pferrel <[email protected]> Committed: Wed Aug 6 16:28:37 2014 -0700 ---------------------------------------------------------------------- .../mahout/math/cf/CooccurrenceAnalysis.scala | 220 ++++++++++ .../apache/mahout/cf/CooccurrenceAnalysis.scala | 218 ---------- .../apache/mahout/drivers/IndexedDataset.scala | 25 +- .../mahout/drivers/ItemSimilarityDriver.scala | 293 +++++-------- .../apache/mahout/drivers/MahoutDriver.scala | 28 +- .../mahout/drivers/MahoutOptionParser.scala | 185 +++++++- .../apache/mahout/drivers/ReaderWriter.scala | 30 +- .../org/apache/mahout/drivers/Schema.scala | 54 ++- .../drivers/TextDelimitedReaderWriter.scala | 107 +++-- .../drm/CheckpointedDrmSpark.scala | 1 - .../io/MahoutKryoRegistrator.scala | 6 +- .../mahout/cf/CooccurrenceAnalysisSuite.scala | 49 ++- .../drivers/ItemSimilarityDriverSuite.scala | 422 +++++++++++++++++-- 13 files changed, 1114 insertions(+), 524 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala new file mode 100644 index 0000000..181b729 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function.{VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) + + // Compute cross-co-occurrence matrix B'A + // pferrel: yikes, this is the wrong order, a big change! so you know who to blame + // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order + val drmAtB = drmA.t %*% drmB + + val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerItemA, bcastInteractionsPerThingB) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + + } + + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + + val candidate = thingA -> llr + + // matches legacy hadoop code and maps values to range (0..1) + // val tLLR = 1.0 - (1.0 / (1.0 + llr)) + //val candidate = thingA -> tLLR + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, ::) + + val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements() + + val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser + + interactionsOfUser.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(userIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala b/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala deleted file mode 100644 index 14cc9d5..0000000 --- a/spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.cf - -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import drm._ -import RLikeDrmOps._ -import scala.collection.JavaConversions._ -import org.apache.mahout.math.stats.LogLikelihood -import collection._ -import org.apache.mahout.common.RandomUtils -import org.apache.mahout.math.function.{VectorFunction, Functions} - - -/** - * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", - * available at http://www.mapr.com/practical-machine-learning - * - * see also "Sebastian Schelter, Christoph Boden, Volker Markl: - * Scalable Similarity-Based Neighborhood Methods with MapReduce - * ACM Conference on Recommender Systems 2012" - */ -object CooccurrenceAnalysis extends Serializable { - - /** Compares (Int,Double) pairs by the second value */ - private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - - def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, - maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { - - implicit val distributedContext = drmARaw.context - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) - - // num users, which equals the maximum number of interactions per item - val numUsers = drmA.nrow.toInt - - // Compute & broadcast the number of interactions per thing in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) - - // Compute co-occurrence matrix A'A - val drmAtA = drmA.t %*% drmA - - // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix - val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, - bcastInteractionsPerItemA, crossCooccurrence = false) - - var indicatorMatrices = List(drmIndicatorsAtA) - - // Now look at cross-co-occurrences - for (drmBRaw <- drmBs) { - // Down-sample and pin other interaction matrix - val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() - - // Compute & broadcast the number of interactions per thing in B - val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) - - // Compute cross-co-occurrence matrix B'A - val drmBtA = drmB.t %*% drmA - - val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, - bcastInteractionsPerThingB, bcastInteractionsPerItemA) - - indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA - - drmB.uncache() - } - - // Unpin downsampled interaction matrix - drmA.uncache() - - // Return list of indicator matrices - indicatorMatrices - } - - /** - * Compute loglikelihood ratio - * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details - **/ - def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, - numInteractionsWithAandB: Long, numInteractions: Long) = { - - val k11 = numInteractionsWithAandB - val k12 = numInteractionsWithA - numInteractionsWithAandB - val k21 = numInteractionsWithB - numInteractionsWithAandB - val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB - - LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) - - } - - def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, - bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], - crossCooccurrence: Boolean = true) = { - drmBtA.mapBlock() { - case (keys, block) => - - val llrBlock = block.like() - val numInteractionsB: Vector = bcastNumInteractionsB - val numInteractionsA: Vector = bcastNumInteractionsA - - for (index <- 0 until keys.size) { - - val thingB = keys(index) - - // PriorityQueue to select the top-k items - val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) - - block(index, ::).nonZeroes().foreach { elem => - val thingA = elem.index - val cooccurrences = elem.get - - // exclude co-occurrences of the item with itself - if (crossCooccurrence || thingB != thingA) { - // Compute loglikelihood ratio - val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, - cooccurrences.toLong, numUsers) - - val candidate = thingA -> llr - - // matches legacy hadoop code and maps values to range (0..1) - // val tLLR = 1.0 - (1.0 / (1.0 + llr)) - //val candidate = thingA -> tLLR - - // Enqueue item with score, if belonging to the top-k - if (topItemsPerThing.size < maxInterestingItemsPerThing) { - topItemsPerThing.enqueue(candidate) - } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { - topItemsPerThing.dequeue() - topItemsPerThing.enqueue(candidate) - } - } - } - - // Add top-k interesting items to the output matrix - topItemsPerThing.dequeueAll.foreach { - case (otherThing, llrScore) => - llrBlock(index, otherThing) = llrScore - } - } - - keys -> llrBlock - } - } - - /** - * Selectively downsample users and things with an anomalous amount of interactions, inspired by - * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java - * - * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not - */ - def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { - - implicit val distributedContext = drmM.context - - // Pin raw interaction matrix - val drmI = drmM.checkpoint() - - // Broadcast vector containing the number of interactions with each thing - val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) - - val downSampledDrmI = drmI.mapBlock() { - case (keys, block) => - val numInteractions: Vector = bcastNumInteractions - - // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures - val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) - - val downsampledBlock = block.like() - - // Downsample the interaction vector of each user - for (userIndex <- 0 until keys.size) { - - val interactionsOfUser = block(userIndex, ::) - - val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements() - - val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser - - interactionsOfUser.nonZeroes().foreach { elem => - val numInteractionsWithThing = numInteractions(elem.index) - val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing - - if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { - // We ignore the original interaction value and create a binary 0-1 matrix - // as we only consider whether interactions happened or did not happen - downsampledBlock(userIndex, elem.index) = 1 - } - } - } - - keys -> downsampledBlock - } - - // Unpin raw interaction matrix - drmI.uncache() - - downSampledDrmI - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala index 0d8c160..41622a8 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala @@ -19,6 +19,8 @@ package org.apache.mahout.drivers import com.google.common.collect.BiMap import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark +import org.apache.mahout.sparkbindings._ /** * Wraps a [[org.apache.mahout.sparkbindings.drm.DrmLike]] object with two [[com.google.common.collect.BiMap]]s to store ID/label translation dictionaries. @@ -39,14 +41,33 @@ import org.apache.mahout.math.drm.CheckpointedDrm * to be not created when not needed. */ -case class IndexedDataset(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) { +case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]) { + + // we must allow the row dimension to be adjusted in the case where the data read in is incomplete and we + // learn this afterwards + + /** + * Adds the equivalent of blank rows to the sparse CheckpointedDrm, which only changes the row cardinality value. + * No physical changes are made to the underlying drm. + * @param n number to use for row carnindality, should be larger than current + * @note should be done before any BLAS optimizer actions are performed on the matrix or you'll get unpredictable + * results. + */ + def newRowCardinality(n: Int): IndexedDataset = { + assert(n > -1) + assert( n >= matrix.nrow) + val drmRdd = matrix.asInstanceOf[CheckpointedDrmSpark[Int]].rdd + val ncol = matrix.ncol + val newMatrix = drmWrap[Int](drmRdd, n, ncol) + new IndexedDataset(newMatrix, rowIDs, columnIDs) + } } /** * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]] * {{{ - * val indexedDataset = IndexedDataset(indexedDatasetReader.readFrom(source)) + * val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source)) * }}} */ http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 71d36c9..e0eaabc 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -17,7 +17,8 @@ package org.apache.mahout.drivers -import org.apache.mahout.cf.CooccurrenceAnalysis +import org.apache.mahout.math.cf.CooccurrenceAnalysis +import scala.collection.immutable.HashMap /** * Command line interface for [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]]. @@ -25,7 +26,7 @@ import org.apache.mahout.cf.CooccurrenceAnalysis * that contain (row id, column id, ...). The IDs are user specified strings which will be * preserved in the * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]] - * will be used to calculate row-wise self-similarity, or when using filters, will generate two + * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two * matrices and calculate both the self similarity of the primary matrix and the row-wise * similarity of the primary * to the secondary. Returns one or two directories of text files formatted as specified in @@ -35,14 +36,21 @@ import org.apache.mahout.cf.CooccurrenceAnalysis * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, * you can specify only the input and output file and directory--all else will default to the correct values. - * @note To use with a Spark cluster see the --masterUrl option, if you run out of heap space check + * Each output line will contain the Item ID and similar items sorted by LLR strength descending. + * @note To use with a Spark cluster see the --master option, if you run out of heap space check * the --sparkExecutorMemory option. */ object ItemSimilarityDriver extends MahoutDriver { - //todo: Should also take two input streams and do cross similarity with no filter required. - // required for examples + // define only the options specific to ItemSimilarity + private final val ItemSimilarityOptions = HashMap[String, Any]( + "maxPrefs" -> 500, + "maxSimilaritiesPerItem" -> 100, + "appName" -> "ItemSimilarityDriver") + + // build options from some stardard CLI param groups + // Note: always put the driver specific options at the last so the can override and previous options! + private var options: Map[String, Any] = null - private var options: Options = _ private var reader1: TextDelimitedIndexedDatasetReader = _ private var reader2: TextDelimitedIndexedDatasetReader = _ private var writer: TextDelimitedIndexedDatasetWriter = _ @@ -52,190 +60,103 @@ object ItemSimilarityDriver extends MahoutDriver { * @param args Command line args, if empty a help message is printed. */ override def main(args: Array[String]): Unit = { - val parser = new MahoutOptionParser[Options]("spark-itemsimilarity") { + options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++ + MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++ + MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions + + val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") { head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT") //Input output options, non-driver specific - note("Input, output options") - opt[String]('i', "input") required() action { (x, options) => - options.copy(input = x) - } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)") - - opt[String]('o', "output") required() action { (x, options) => - if (x.endsWith("/")) // todo: check to see if HDFS allows MS-Windows backslashes locally? - options.copy(output = x) - else - options.copy(output = x + "/") - } text ("Path for output, any local or HDFS supported URI (required).") + parseIOOptions //Algorithm control options--driver specific note("\nAlgorithm control options:") - opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => - options.copy(master = x) - } - opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) => - options.copy(maxPrefs = x) - } text ("Max number of preferences to consider per user (optional). Default: 500") validate { x => + options + ("maxPrefs" -> x) + } text ("Max number of preferences to consider per user (optional). Default: " + + ItemSimilarityOptions("maxPrefs")) validate { x => if (x > 0) success else failure("Option --maxPrefs must be > 0") } /** not implemented in CooccurrenceAnalysis.cooccurrence opt[Int]("minPrefs") abbr ("mp") action { (x, options) => - options.copy(minPrefs = x) + options.put("minPrefs", x) + options } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x => if (x > 0) success else failure("Option --minPrefs must be > 0") } */ opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) => - options.copy(maxSimilaritiesPerItem = x) - } text ("Limit the number of similarities per item to this number (optional). Default: 100") validate { x => + options + ("maxSimilaritiesPerItem" -> x) + } text ("Limit the number of similarities per item to this number (optional). Default: " + + ItemSimilarityOptions("maxSimilaritiesPerItem")) validate { x => if (x > 0) success else failure("Option --maxSimilaritiesPerItem must be > 0") } - opt[Int]("randomSeed") abbr ("rs") action { (x, options) => - options.copy(randomSeed = x) - } text ("Int to seed random number generator (optional). Default: Uses time to generate a seed") validate { x => - if (x > 0) success else failure("Option --randomSeed must be > 0") - } - - //Input text file schema--not driver specific but input data specific, tuples input, - // not drms - note("\nInput text file schema options:") - opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) => - options.copy(inDelim = x) - } - - opt[String]("filter1") abbr ("f1") action { (x, options) => - options.copy(filter1 = x) - } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used") - - opt[String]("filter2") abbr ("f2") action { (x, options) => - options.copy(filter2 = x) - } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected.") - - opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) => - options.copy(rowIDPosition = x) - } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => - if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") - } - - opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) => - options.copy(itemIDPosition = x) - } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => - if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") - } - - opt[Int]("filterPosition") abbr ("fc") action { (x, options) => - options.copy(filterPosition = x) - } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => - if (x >= -1) success else failure("Option --filterColNum must be >= -1") - } - - note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used") + //Driver notes--driver specific + note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.") - //File finding strategy--not driver specific - note("\nFile discovery options:") - opt[Unit]('r', "recursive") action { (_, options) => - options.copy(recursive = true) - } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") + //Input text format + parseInputSchemaOptions - opt[String]("filenamePattern") abbr ("fp") action { (x, options) => - options.copy(filenamePattern = x) - } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory") + //How to search for input + parseFileDiscoveryOptions //Drm output schema--not driver specific, drm specific - note("\nOutput text file schema options:") - opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => - options.copy(rowKeyDelim = x) - } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") - - opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) => - options.copy(columnIdStrengthDelim = x) - } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") - - opt[String]("tupleDelim") abbr ("td") action { (x, options) => - options.copy(tupleDelim = x) - } text ("Separates vector tuple values in the values list (optional). Default: \",\"") - - opt[Unit]("omitStrength") abbr ("os") action { (_, options) => - options.copy(omitStrength = true) - } text ("Do not write the strength to the output files (optional), Default: false.") - note("This option is used to output indexable data for creating a search engine recommender.") + parseDrmFormatOptions //Spark config options--not driver specific - note("\nSpark config options:") - opt[String]("sparkExecutorMem") abbr ("sem") action { (x, options) => - options.copy(sparkExecutorMem = x) - } text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") + parseSparkOptions - note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2,itemID10:value10...\"") - - //Jar inclusion, this option can be set when executing the driver from compiled code - opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) => - options.copy(dontAddMahoutJars = true) //set the value MahoutDriver so the context will be created correctly - }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly - - //Driver notes--driver specific - note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.\n") + //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI + parseGenericOptions help("help") abbr ("h") text ("prints this usage text\n") - checkConfig { c => - if (c.filterPosition == c.itemIDPosition - || c.filterPosition == c.rowIDPosition - || c.rowIDPosition == c.itemIDPosition) - failure("The row, item, and filter positions must be unique.") else success - } - - //check for option consistency, probably driver specific - checkConfig { c => - if (c.filter1 != null && c.filter2 != null && c.filter1 == c.filter2) failure("If" + - " using filters they must be unique.") else success - } - } - - //repeated code, should this be put base MahoutDriver somehow? - parser.parse(args, Options()) map { opts => + parser.parse(args, options) map { opts => options = opts process } - } - override def start(masterUrl: String = options.master, - appName: String = options.appName, dontAddMahoutJars: Boolean = options.dontAddMahoutJars): + override def start(masterUrl: String = options("master").asInstanceOf[String], + appName: String = options("appName").asInstanceOf[String], + dontAddMahoutJars: Boolean = options("dontAddMahoutJars").asInstanceOf[Boolean]): Unit = { + // todo: the HashBiMap used in the TextDelimited Reader is hard coded into + // MahoutKryoRegistrator, it should be added to the register list here so it + // will be only spcific to this job. sparkConf.set("spark.kryo.referenceTracking", "false") .set("spark.kryoserializer.buffer.mb", "200") - .set("spark.executor.memory", options.sparkExecutorMem) + .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String]) super.start(masterUrl, appName, dontAddMahoutJars) - val readSchema1 = new Schema("delim" -> options.inDelim, "filter" -> options.filter1, - "rowIDPosition" -> options.rowIDPosition, - "columnIDPosition" -> options.itemIDPosition, - "filterPosition" -> options.filterPosition) + val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String], + "filter" -> options("filter1").asInstanceOf[String], + "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int], + "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int], + "filterPosition" -> options("filterPosition").asInstanceOf[Int]) reader1 = new TextDelimitedIndexedDatasetReader(readSchema1) - if (options.filterPosition != -1 && options.filter2 != null) { - val readSchema2 = new Schema("delim" -> options.inDelim, "filter" -> options.filter2, - "rowIDPosition" -> options.rowIDPosition, - "columnIDPosition" -> options.itemIDPosition, - "filterPosition" -> options.filterPosition) + if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null) + || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){ + // only need to change the filter used compared to readSchema1 + val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String]) reader2 = new TextDelimitedIndexedDatasetReader(readSchema2) } writeSchema = new Schema( - "rowKeyDelim" -> options.rowKeyDelim, - "columnIdStrengthDelim" -> options.columnIdStrengthDelim, - "omitScore" -> options.omitStrength, - "tupleDelim" -> options.tupleDelim) + "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String], + "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String], + "omitScore" -> options("omitStrength").asInstanceOf[Boolean], + "tupleDelim" -> options("tupleDelim").asInstanceOf[String]) writer = new TextDelimitedIndexedDatasetWriter(writeSchema) @@ -243,24 +164,60 @@ object ItemSimilarityDriver extends MahoutDriver { private def readIndexedDatasets: Array[IndexedDataset] = { - val inFiles = FileSysUtils(options.input, options.filenamePattern, options.recursive).uris + val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String], + options("recursive").asInstanceOf[Boolean]).uris + val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) "" + else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String], + options("recursive").asInstanceOf[Boolean]).uris if (inFiles.isEmpty) { Array() } else { - val selfSimilarityDataset = IndexedDataset(reader1.readFrom(inFiles)) + val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles)) + if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA, + options("output").asInstanceOf[String] + "../input-datasets/primary-interactions") + + // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B + // Here we assume there is one row ID space for all interactions. To do this we calculate the + // row cardinality only after reading in A and B (or potentially C...) We then adjust the + // cardinality so all match, which is required for the math to work. + // Note: this may leave blank rows with no representation in any DRM. Blank rows need to + // be supported (and are at least on Spark) or the row cardinality fix will not work. + val datasetB = if (!inFiles2.isEmpty) { + // get cross-cooccurrence interactions from separate files + val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs)) + + datasetB + + } else if (options("filterPosition").asInstanceOf[Int] != -1 + && options("filter2").asInstanceOf[String] != null) { + + // get cross-cooccurrences interactions by using two filters on a single set of files + val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs)) + + datasetB - if (options.filterPosition != -1 && options.filter2 != null) { - // todo: needs to support more than one cross-similarity indicator - val crossSimilarityDataset1 = IndexedDataset(reader2.readFrom(inFiles)) - Array(selfSimilarityDataset, crossSimilarityDataset1) } else { - Array(selfSimilarityDataset) + null.asInstanceOf[IndexedDataset] } + if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc + // true row cardinality is the size of the row id index, which was calculated from all rows of A and B + val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality - } + // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on + // its calculation + val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality) + else datasetA // this guarantees matching cardinality + val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality) + else datasetB // this guarantees matching cardinality + + if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions") + + Array(returnedA, returnedB) + } else Array(datasetA) + } } override def process: Unit = { @@ -271,57 +228,29 @@ object ItemSimilarityDriver extends MahoutDriver { // todo: allow more than one cross-similarity matrix? val indicatorMatrices = { if (indexedDatasets.length > 1) { - CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs, Array(indexedDatasets(1).matrix)) + CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int], + options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int], + Array(indexedDatasets(1).matrix)) } else { - CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options.randomSeed, options.maxSimilaritiesPerItem, options.maxPrefs) + CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int], + options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int]) } } - // self similarity - // the next two lines write the drm using a Writer class - // val selfIndicatorDataset = new IndexedDataset(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs) - // writeStore.writeTo(selfIndicatorDataset, options.output + "indicator-matrix") - // an alternative is to create a version of IndexedDataset that knows how to write itself val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs, writeSchema) - selfIndicatorDataset.writeTo(options.output + "indicator-matrix") + selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix") - // todo: needs to support more than one cross-similarity indicator + // todo: would be nice to support more than one cross-similarity indicator if (indexedDatasets.length > 1) { val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity - writer.writeTo(crossIndicatorDataset, options.output + "cross-indicator-matrix") + writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix") } stop } - // Default values go here, any "_" or null should be "required" in the Parser or flags an unused option - // todo: support two input streams for cross-similarity, maybe assume one schema for all inputs - case class Options( - master: String = "local", - sparkExecutorMem: String = "2g", - appName: String = "ItemSimilarityJob", - randomSeed: Int = System.currentTimeMillis().toInt, - recursive: Boolean = false, - input: String = null, - output: String = null, - filenamePattern: String = "^part-.*", - maxSimilaritiesPerItem: Int = 100, - maxPrefs: Int = 500, - minPrefs: Int = 1, - rowIDPosition: Int = 0, - itemIDPosition: Int = 1, - filterPosition: Int = -1, - filter1: String = null, - filter2: String = null, - inDelim: String = "[,\t ]", - rowKeyDelim: String = "\t", - columnIdStrengthDelim: String = ":", - tupleDelim: String = ",", - omitStrength: Boolean = false, - dontAddMahoutJars: Boolean = false) - } http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala index 0c579d4..796a66a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -21,17 +21,26 @@ import org.apache.mahout.math.drm.DistributedContext import org.apache.spark.SparkConf import org.apache.mahout.sparkbindings._ +import scala.collection.immutable + /** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. - * Also define a command line parser and default options or fill in the following template: + * Also define a Map of options for the command line parser. The following template may help: * {{{ * object SomeDriver extends MahoutDriver { + * // build options from some stardard CLI param groups + * // Note: always put the driver specific options at the last so the can override and previous options! + * private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++ + * TextDelimitedDRMOptions ++ ItemSimilarityOptions + * * override def main(args: Array[String]): Unit = { - * val parser = new MahoutOptionParser[Options]("Job Name") { - * head("Job Name", "Spark") - * note("Various CLI options") - * //see https://github.com/scopt/scopt for a good Scala option parser, which MahoutOptionParser extends + * val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") { + * head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT") + * + * //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods + * parseGenericOptions + * ... * } - * parser.parse(args, Options()) map { opts => + * parser.parse(args, options) map { opts => * options = opts * process * } @@ -42,15 +51,12 @@ import org.apache.mahout.sparkbindings._ * //don't just stand there do something * stop * } - * - * //Default values go here, any '_' or null should be 'required' in the Parser or flags an unused option - * case class Options( - * appName: String = "Job Name", ... - * ) * } * }}} */ abstract class MahoutDriver { + + implicit var mc: DistributedContext = _ implicit val sparkConf = new SparkConf() http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala index 8a337f5..ba4ca1d 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -17,8 +17,189 @@ package org.apache.mahout.drivers import scopt.OptionParser +import scala.collection.immutable -/** Modifies default [[scopt.OptionParser]] to output long help-like usage + error message */ -class MahoutOptionParser[C](programName: String) extends OptionParser[C](programName: String) { +/** Companion object defines default option groups for reference in any driver that needs them */ +object MahoutOptionParser { + // set up the various default option groups + final val GenericOptions = immutable.HashMap[String, Any]( + "randomSeed" -> System.currentTimeMillis().toInt, + "dontAddMahoutJars" -> false, + "writeAllDatasets" -> false) + + final val SparkOptions = immutable.HashMap[String, Any]( + "master" -> "local", + "sparkExecutorMem" -> "2g", + "appName" -> "Generic Spark App, Change this.") + + final val FileIOOptions = immutable.HashMap[String, Any]( + "recursive" -> false, + "input" -> null.asInstanceOf[String], + "input2" -> null.asInstanceOf[String], + "output" -> null.asInstanceOf[String], + "filenamePattern" -> "^part-.*") + + final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any]( + "rowIDPosition" -> 0, + "itemIDPosition" -> 1, + "filterPosition" -> -1, + "filter1" -> null.asInstanceOf[String], + "filter2" -> null.asInstanceOf[String], + "inDelim" -> "[,\t ]") + + final val TextDelimitedDRMOptions = immutable.HashMap[String, Any]( + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "tupleDelim" -> " ", + "omitStrength" -> false) +} +/** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to + * keep both standarized. + * @param programName Name displayed in help message, the name by which the driver is invoked. + * */ +class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { override def showUsageOnError = true + + def parseIOOptions = { + note("Input, output options") + opt[String]('i', "input") required() action { (x, options) => + options + ("input" -> x) + } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)") + + opt[String]("input2") abbr ("i2") action { (x, options) => + options + ("input2" -> x) + } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.") + + opt[String]('o', "output") required() action { (x, options) => + // todo: check to see if HDFS allows MS-Windows backslashes locally? + if (x.endsWith("/")) { + options + ("output" -> x) + } else { + options + ("output" -> (x + "/")) + } + } text ("Path for output, any local or HDFS supported URI (required)") + + } + + def parseSparkOptions = { + note("\nSpark config options:") + + opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => + options + ("master" -> x) + } + + opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each node (optional). Default: 4g") action { (x, options) => + options + ("sparkExecutorMem" -> x) + } + + } + + def parseGenericOptions = { + note("\nGeneral config options:") + opt[Int]("randomSeed") abbr ("rs") action { (x, options) => + options + ("randomSeed" -> x) + } validate { x => + if (x > 0) success else failure("Option --randomSeed must be > 0") + } + + opt[Unit]("dontAddMahoutJars") hidden() action { (_, options) => + options + ("dontAddMahoutJars" -> true) + }//Hidden option, used when executing tests or calling from other code where classes are all loaded explicitly + + //output both input DRMs + opt[Unit]("writeAllDatasets") hidden() action { (_, options) => + options + ("writeAllDatasets" -> true) + }//Hidden option, though a user might want this. + } + + def parseInputSchemaOptions{ + //Input text file schema--not driver specific but input data specific, tuples input, + // not drms + note("\nInput text file schema options:") + opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) => + options + ("inDelim" -> x) + } + + opt[String]("filter1") abbr ("f1") action { (x, options) => + options + ("filter1" -> x) + } text ("String (or regex) whose presence indicates a datum for the primary item set (optional). Default: no filter, all data is used") + + opt[String]("filter2") abbr ("f2") action { (x, options) => + options + ("filter2" -> x) + } text ("String (or regex) whose presence indicates a datum for the secondary item set (optional). If not present no secondary dataset is collected") + + opt[Int]("rowIDPosition") abbr ("rc") action { (x, options) => + options + ("rowIDPosition" -> x) + } text ("Column number (0 based Int) containing the row ID string (optional). Default: 0") validate { x => + if (x >= 0) success else failure("Option --rowIDColNum must be >= 0") + } + + opt[Int]("itemIDPosition") abbr ("ic") action { (x, options) => + options + ("itemIDPosition" -> x) + } text ("Column number (0 based Int) containing the item ID string (optional). Default: 1") validate { x => + if (x >= 0) success else failure("Option --itemIDColNum must be >= 0") + } + + opt[Int]("filterPosition") abbr ("fc") action { (x, options) => + options + ("filterPosition" -> x) + } text ("Column number (0 based Int) containing the filter string (optional). Default: -1 for no filter") validate { x => + if (x >= -1) success else failure("Option --filterColNum must be >= -1") + } + + note("\nUsing all defaults the input is expected of the form: \"userID<tab>itemId\" or \"userID<tab>itemID<tab>any-text...\" and all rows will be used") + + checkConfig { options: Map[String, Any] => + if (options("filterPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int] + || options("filterPosition").asInstanceOf[Int] == options("rowIDPosition").asInstanceOf[Int] + || options("rowIDPosition").asInstanceOf[Int] == options("itemIDPosition").asInstanceOf[Int]) + failure("The row, item, and filter positions must be unique.") else success + } + + //check for option consistency, probably driver specific + checkConfig { options: Map[String, Any] => + if (options("filter1").asInstanceOf[String] != null.asInstanceOf[String] + && options("filter2").asInstanceOf[String] != null.asInstanceOf[String] + && options("filter1").asInstanceOf[String] == options("filter2").asInstanceOf[String]) + failure ("If using filters they must be unique.") else success + } + + } + + def parseFileDiscoveryOptions = { + //File finding strategy--not driver specific + note("\nFile discovery options:") + opt[Unit]('r', "recursive") action { (_, options) => + options + ("recursive" -> true) + } text ("Searched the -i path recursively for files that match --filenamePattern (optional), Default: false") + + opt[String]("filenamePattern") abbr ("fp") action { (x, options) => + options + ("filenamePattern" -> x) + } text ("Regex to match in determining input files (optional). Default: filename in the --input option or \"^part-.*\" if --input is a directory") + + } + + def parseDrmFormatOptions = { + note("\nOutput text file schema options:") + opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => + options + ("rowKeyDelim" -> x) + } text ("Separates the rowID key from the vector values list (optional). Default: \"\\t\"") + + opt[String]("columnIdStrengthDelim") abbr ("cd") action { (x, options) => + options + ("columnIdStrengthDelim" -> x) + } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") + + opt[String]("tupleDelim") abbr ("td") action { (x, options) => + options + ("tupleDelim" -> x) + } text ("Separates vector tuple values in the values list (optional). Default: \" \"") + + opt[Unit]("omitStrength") abbr ("os") action { (_, options) => + options + ("omitStrength" -> true) + } text ("Do not write the strength to the output files (optional), Default: false.") + note("This option is used to output indexable data for creating a search engine recommender.") + + note("\nDefault delimiters will produce output of the form: \"itemID1<tab>itemID2:value2<space>itemID10:value10...\"") + } + } + + http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala index c5b7385..e2bb49c 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala @@ -17,25 +17,39 @@ package org.apache.mahout.drivers +import com.google.common.collect.{HashBiMap, BiMap} import org.apache.mahout.math.drm.DistributedContext -/** Reader trait is abstract in the sense that the reader function must be defined by an extending trait, which also defines the type to be read. - * @tparam T type of object read, usually supplied by an extending trait. - * @todo the reader need not create both dictionaries but does at present. There are cases where one or the other dictionary is never used so saving the memory for a very large dictionary may be worth the optimization to specify which dictionaries are created. +/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read. + * @tparam T type of object read. */ trait Reader[T]{ + val mc: DistributedContext val readSchema: Schema - protected def reader(mc: DistributedContext, readSchema: Schema, source: String): T - def readFrom(source: String): T = reader(mc, readSchema, source) + + protected def tupleReader( + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: BiMap[String, Int]): T + + def readTuplesFrom( + source: String, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = + tupleReader(mc, readSchema, source, existingRowIDs) } /** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written. - * @tparam T + * @tparam T type of object to write. */ trait Writer[T]{ + val mc: DistributedContext + val sort: Boolean val writeSchema: Schema - protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T): Unit - def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection) + + protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit + + def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort) } http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala index 7735b83..edff92d 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala @@ -20,21 +20,30 @@ package org.apache.mahout.drivers import scala.collection.mutable import scala.collection.mutable.HashMap -/** Syntactic sugar for HashMap[String, Any] +/** Syntactic sugar for mutable.HashMap[String, Any] * * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} */ class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { // note: this require a mutable HashMap, do we care? this ++= params - if (!this.contains("omitScore")) this += ("omitScore" -> false) + + /** Constructor for copying an existing Schema + * + * @param schemaToClone return a copy of this Schema + */ + def this(schemaToClone: Schema){ + this() + this ++= schemaToClone + } } -// These can be used to keep the text in and out fairly standard to Mahout, where an application specific format is not -// required. +// These can be used to keep the text in and out fairly standard to Mahout, where an application specific +// format is not required. /** Simple default Schema for typical text delimited tuple file input - * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...) + * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID + * <comma, tab, or space>here may be other ignored text...) */ class DefaultTupleReadSchema extends Schema( "delim" -> "[,\t ]", //comma, tab or space @@ -43,44 +52,47 @@ class DefaultTupleReadSchema extends Schema( "columnIDPosition" -> 1, "filterPosition" -> -1) -/** Simple default Schema for typical text delimited drm file output - * This tells the writer to write a DRM of the default - * (rowID<tab>columnID1:score1,columnID2:score2,...) +/** Default Schema for text delimited drm file output + * This tells the writer to write a DRM of the default form: + * (rowID<tab>columnID1:score1<space>columnID2:score2...) */ class DefaultDRMWriteSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> ",") + "tupleDelim" -> " ", + "omitScore" -> false) -/** Simple default Schema for typical text delimited drm file output - * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID<comma, tab, or space>etc...) +/** Default Schema for typical text delimited drm file input + * This tells the reader to input text lines of the form: + * (rowID<tab>columnID1:score1,columnID2:score2,...) */ class DefaultDRMReadSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> ",") + "tupleDelim" -> " ") -/** Simple default Schema for reading a text delimited drm file where the score of any tuple is ignored, +/** Default Schema for reading a text delimited drm file where the score of any tuple is ignored, * all non-zeros are replaced with 1. * This tells the reader to input DRM lines of the form - * (rowID<tab>columnID1:score1,columnID2:score2,...) remember the score is ignored. Alternatively the format can be - * (rowID<tab>columnID1,columnID2,...) where presence indicates a score of 1. This is the default output format for - * [[org.apache.mahout.drivers.DRMWriteBooleanSchema]] + * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored. + * Alternatively the format can be + * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default + * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]] */ class DRMReadBooleanSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> ",", + "tupleDelim" -> " ", "omitScore" -> true) -/** Simple default Schema for typical text delimited drm file write where the score of a tuple is omitted. +/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted. * The presence of a tuple means the score = 1, the absence means a score of 0. - * This tells the reader to input DRM lines of the form - * (rowID<tab>columnID1,columnID2,...) + * This tells the writer to output DRM lines of the form + * (rowID<tab>columnID1<space>columnID2...) */ class DRMWriteBooleanSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> ",", + "tupleDelim" -> " ", "omitScore" -> true) http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index ae78d59..11d647b 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -17,14 +17,12 @@ package org.apache.mahout.drivers -import scala.collection.JavaConversions._ import org.apache.spark.SparkContext._ import org.apache.mahout.math.RandomAccessSparseVector import com.google.common.collect.{BiMap, HashBiMap} -import scala.collection.JavaConversions._ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} import org.apache.mahout.sparkbindings._ - +import scala.collection.JavaConversions._ /** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]] */ @@ -36,7 +34,11 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] * @return */ - protected def reader(mc: DistributedContext, readSchema: Schema, source: String): IndexedDataset = { + protected def tupleReader( + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = { try { val delimiter = readSchema("delim").asInstanceOf[String] val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int] @@ -51,6 +53,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ }) var columns = mc.textFile(source).map { line => line.split(delimiter) } + //val m = columns.collect // -1 means no filter in the input text, take them all if(filterPosition != -1) { @@ -59,7 +62,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ } // get row and column IDs - val m = columns.collect + //val m = columns.collect val interactions = columns.map { tokens => tokens(rowIDPosition) -> tokens(columnIDPosition) } @@ -75,10 +78,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID // broadcast them for access in distributed processes, so they are not recalculated in every task. - val rowIDDictionary = asOrderedDictionary(rowIDs) + val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs) val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary) - val columnIDDictionary = asOrderedDictionary(columnIDs) + val columnIDDictionary = asOrderedDictionary(entries = columnIDs) val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary) val indexedInteractions = @@ -113,11 +116,10 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ // this creates a BiMap from an ID collection. The ID points to an ordinal int // which is used internal to Mahout as the row or column ID // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem - private def asOrderedDictionary(entries: Array[String]): BiMap[String, Int] = { - var dictionary: BiMap[String, Int] = HashBiMap.create() - var index = 0 + private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = { + var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index' for (entry <- entries) { - dictionary.forcePut(entry, index) + if (!dictionary.contains(entry)) dictionary.put(entry, index) index += 1 } dictionary @@ -125,13 +127,21 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ } trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ + + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + /** Read in text delimited tuples from all URIs in this comma delimited source String. * * @param mc context for the Spark job * @param writeSchema describes the delimiters and positions of values in the output text delimited file. * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]] */ - protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, indexedDataset: IndexedDataset): Unit = { + protected def writer( + mc: DistributedContext, + writeSchema: Schema, + dest: String, + indexedDataset: IndexedDataset, + sort: Boolean = true): Unit = { try { val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String] @@ -140,8 +150,14 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ //instance vars must be put into locally scoped vals when put into closures that are //executed but Spark - assert (indexedDataset != null, {println(this.getClass.toString+": has no indexedDataset to write"); throw new IllegalArgumentException }) - assert (!dest.isEmpty, {println(this.getClass.toString+": has no destination or indextedDataset to write"); throw new IllegalArgumentException}) + assert(indexedDataset != null, { + println(this.getClass.toString + ": has no indexedDataset to write") + throw new IllegalArgumentException + }) + assert(!dest.isEmpty, { + println(this.getClass.toString + ": has no destination or indextedDataset to write") + throw new IllegalArgumentException + }) val matrix = indexedDataset.matrix val rowIDDictionary = indexedDataset.rowIDs @@ -149,18 +165,29 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ matrix.rdd.map { case (rowID, itemVector) => - // each line is created of non-zero values with schema specified delimiters and original row and column ID tokens - // first get the external rowID token - var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim - - // for the rest of the row, construct the vector contents of tuples (external column ID, strength value) - for (item <- itemVector.nonZeroes()) { - line += columnIDDictionary.inverse.get(item.index) - if (!omitScore) line += columnIdStrengthDelim + item.get - line += tupleDelim + // turn non-zeros into list for sorting + val itemList: collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] = new collection.mutable.MutableList[org.apache.mahout.common.Pair[Integer, Double]] + for (ve <- itemVector.nonZeroes) { + val item: org.apache.mahout.common.Pair[Integer, Double] = new org.apache.mahout.common.Pair[Integer, Double](ve.index, ve.get) + itemList += item } - // drop the last delimiter, not needed to end the line - line.dropRight(1) + //sort by highest value descending(-) + val vector = if (sort) itemList.sortBy(-_.getSecond) else itemList + + // first get the external rowID token + if (!vector.isEmpty){ + var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim + // for the rest of the row, construct the vector contents of tuples (external column ID, strength value) + for (item <- vector) { + line += columnIDDictionary.inverse.get(item.getFirst) + if (!omitScore) line += columnIdStrengthDelim + item.getSecond + line += tupleDelim + } + // drop the last delimiter, not needed to end the line + line.dropRight(1) + } else {//no items so write a line with id but no values, no delimiters + rowIDDictionary.inverse.get(rowID) + } // "if" returns a line of text so this must be last in the block } .saveAsTextFile(dest) @@ -176,25 +203,28 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param readSchema describes the delimiters and position of values in the text delimited file to be read. * @param mc Spark context for reading files - * @note The source is supplied by Reader#readFrom . + * @note The source is supplied by Reader#readTuplesFrom . * */ -class TextDelimitedIndexedDatasetReader(val readSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReader +class TextDelimitedIndexedDatasetReader(val readSchema: Schema) + (implicit val mc: DistributedContext) extends TDIndexedDatasetReader /** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. * @param mc Spark context for reading files - * @note the destination is supplied by Writer#writeTo trait method + * @note the destination is supplied by Writer#writeDRMTo trait method * */ -class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter +class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter /** Reads and writes text delimited files to/from an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param readSchema describes the delimiters and position of values in the text delimited file(s) to be read. * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. * @param mc Spark context for reading the files, may be implicitly defined. * */ -class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema)(implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter +class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeSchema: Schema, val sort: Boolean = true) + (implicit val mc: DistributedContext) + extends TDIndexedDatasetReaderWriter -/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating +/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm(). * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables * are probably short lived in terms of lines of code so complexity may be moot. @@ -204,12 +234,17 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS * @param writeSchema contains params for the schema/format or the written text delimited file. * @param mc mahout distributed context (DistributedContext) may be implicitly defined. * */ -class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int], - val writeSchema: Schema)(implicit val mc: DistributedContext) +class IndexedDatasetTextDelimitedWriteable( + matrix: CheckpointedDrm[Int], + rowIDs: BiMap[String,Int], + columnIDs: BiMap[String,Int], + val writeSchema: Schema, + val sort: Boolean = true) + (implicit val mc: DistributedContext) extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter { def writeTo(dest: String): Unit = { - writeTo(this, dest) + writeDRMTo(this, dest) } } @@ -217,11 +252,11 @@ class IndexedDatasetTextDelimitedWriteable(matrix: CheckpointedDrm[Int], rowIDs: * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]] * {{{ - * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readFrom(source)) + * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source)) * }}} */ object IndexedDatasetTextDelimitedWriteable { /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */ - def apply(id2: IndexedDatasetTextDelimitedWriteable) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema)(id2.mc) + def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc) } http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 1c5546b..cc5ebf2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -65,7 +65,6 @@ class CheckpointedDrmSpark[K: ClassTag]( private var cached: Boolean = false override val context: DistributedContext = rdd.context - /** * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer * and writing down Spark graph lineage since last checkpointed DRM. http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala index 22e31cc..61f37e4 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala @@ -23,6 +23,10 @@ import com.google.common.collect.HashBiMap import org.apache.mahout.math._ import org.apache.spark.serializer.KryoRegistrator import org.apache.mahout.sparkbindings._ +import org.apache.mahout.common.Pair +import org.apache.mahout.math.Vector.Element + +import scala.collection.immutable.List /** Kryo serialization registrator for Mahout */ class MahoutKryoRegistrator extends KryoRegistrator { @@ -32,6 +36,6 @@ class MahoutKryoRegistrator extends KryoRegistrator { kryo.addDefaultSerializer(classOf[Vector], new WritableKryoSerializer[Vector, VectorWritable]) kryo.addDefaultSerializer(classOf[DenseVector], new WritableKryoSerializer[Vector, VectorWritable]) kryo.addDefaultSerializer(classOf[Matrix], new WritableKryoSerializer[Matrix, MatrixWritable]) - kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()); + kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/a8097403/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala index 938dc33..642e90a 100644 --- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala @@ -17,6 +17,7 @@ package org.apache.mahout.cf +import org.apache.mahout.math.cf.CooccurrenceAnalysis import org.apache.mahout.math.drm._ import org.apache.mahout.math.scalabindings.{MatrixOps, _} import org.apache.mahout.sparkbindings.test.DistributedSparkSuite @@ -48,13 +49,19 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut (0.0, 0.0, 0.0, 0.0, 0.0)) // correct cross-cooccurrence with LLR - final val matrixLLRCoocBtAControl = dense( + final val m = dense( (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897), (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), (0.0, 0.0, 0.0, 0.0, 4.498681156950466)) + final val matrixLLRCoocBtAControl = dense( + (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), + (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), + (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), + (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), + (0.0, 0.0, 0.6795961471815897, 0.0, 4.498681156950466)) test("cooccurrence [A'A], [B'A] boolbean data using LLR") { @@ -150,6 +157,46 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with Distribut n should be < 1E-10 } + test("cooccurrence two matrices with different number of columns"){ + val a = dense( + (1, 1, 0, 0, 0), + (0, 0, 1, 1, 0), + (0, 0, 0, 0, 1), + (1, 0, 0, 1, 0)) + + val b = dense( + (0, 1, 1, 0), + (1, 1, 1, 0), + (0, 0, 1, 0), + (1, 1, 0, 1)) + + val matrixLLRCoocBtANonSymmetric = dense( + (0.0, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847), + (0.0, 0.6795961471815897, 0.6795961471815897, 0.0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0), + (5.545177444479561, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847), + (0.0, 0.0, 0.6795961471815897, 0.0)) + + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + //self similarity + val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric) + n = (new MatrixOps(m = diff2Matrix)).norm + + //cooccurrence without LLR is just a A'B + //val inCoreAtB = a.transpose().times(b) + //val bp = 0 + } + test("LLR calc") { val A = dense( (1, 1, 0, 0, 0),
