MAHOUT-1604 add a CLI and associated code for spark-rowsimilarity, also cleans up some things in MAHOUT-1568 and MAHOUT-1569, closes apache/mahout#47
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/149c9859 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/149c9859 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/149c9859 Branch: refs/heads/master Commit: 149c98592fe447c98dfb5afc67b5809725cc3056 Parents: 91f15ec Author: pferrel <[email protected]> Authored: Thu Aug 28 10:45:13 2014 -0700 Committer: pferrel <[email protected]> Committed: Thu Aug 28 10:45:13 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + bin/mahout | 10 +- .../mahout/math/cf/CooccurrenceAnalysis.scala | 220 ------ .../mahout/math/cf/SimilarityAnalysis.scala | 261 +++++++ .../apache/mahout/math/drm/RLikeDrmOps.scala | 11 +- .../mahout/math/scalabindings/MatrixOps.scala | 3 + .../math/scalabindings/MatrixOpsSuite.scala | 4 +- spark/pom.xml | 21 + .../apache/mahout/drivers/FileSysUtils.scala | 55 +- .../apache/mahout/drivers/IndexedDataset.scala | 11 +- .../mahout/drivers/ItemSimilarityDriver.scala | 117 ++- .../apache/mahout/drivers/MahoutDriver.scala | 62 +- .../mahout/drivers/MahoutOptionParser.scala | 42 +- .../apache/mahout/drivers/ReaderWriter.scala | 21 +- .../mahout/drivers/RowSimilarityDriver.scala | 159 +++++ .../org/apache/mahout/drivers/Schema.scala | 36 +- .../drivers/TextDelimitedReaderWriter.scala | 134 +++- .../mahout/sparkbindings/SparkEngine.scala | 2 +- .../mahout/cf/CooccurrenceAnalysisSuite.scala | 29 +- .../drivers/ItemSimilarityDriverSuite.scala | 713 ++++++++++--------- .../drivers/RowSimilarityDriverSuite.scala | 138 ++++ 21 files changed, 1275 insertions(+), 776 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 47518b4..dfccd95 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 1.0 - unreleased + MAHOUT-1604: Spark version of rowsimilarity driver and associated additions to SimilarityAnalysis.scala (pferrel) + MAHOUT-1500: H2O Integration (Anand Avati via apalumbo) MAHOUT-1606 - Add rowSums, rowMeans and diagonal extraction operations to distributed matrices (dlyubimov) http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/bin/mahout ---------------------------------------------------------------------- diff --git a/bin/mahout b/bin/mahout index 27acd9f..c22118b 100755 --- a/bin/mahout +++ b/bin/mahout @@ -88,6 +88,10 @@ if [ "$1" == "spark-itemsimilarity" ]; then SPARK=1 fi +if [ "$1" == "spark-rowsimilarity" ]; then + SPARK=1 +fi + if [ "$MAHOUT_CORE" != "" ]; then IS_CORE=1 fi @@ -179,7 +183,7 @@ then done fi - # add spark-shell -- if we requested shell or other spark CLI driver + # add jars for running from the command line if we requested shell or spark CLI driver if [ "$SPARK" == "1" ]; then for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar ; do @@ -254,6 +258,10 @@ case "$1" in shift "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.ItemSimilarityDriver" "$@" ;; + (spark-rowsimilarity) + shift + "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.drivers.RowSimilarityDriver" "$@" + ;; (h2o-node) shift "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "water.H2O" -md5skip "$@" -name mah2out http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala deleted file mode 100644 index 181b729..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/CooccurrenceAnalysis.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.cf - -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import drm._ -import RLikeDrmOps._ -import scala.collection.JavaConversions._ -import org.apache.mahout.math.stats.LogLikelihood -import collection._ -import org.apache.mahout.common.RandomUtils -import org.apache.mahout.math.function.{VectorFunction, Functions} - - -/** - * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", - * available at http://www.mapr.com/practical-machine-learning - * - * see also "Sebastian Schelter, Christoph Boden, Volker Markl: - * Scalable Similarity-Based Neighborhood Methods with MapReduce - * ACM Conference on Recommender Systems 2012" - */ -object CooccurrenceAnalysis extends Serializable { - - /** Compares (Int,Double) pairs by the second value */ - private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - - def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, - maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { - - implicit val distributedContext = drmARaw.context - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) - - // num users, which equals the maximum number of interactions per item - val numUsers = drmA.nrow.toInt - - // Compute & broadcast the number of interactions per thing in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) - - // Compute co-occurrence matrix A'A - val drmAtA = drmA.t %*% drmA - - // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix - val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, - bcastInteractionsPerItemA, crossCooccurrence = false) - - var indicatorMatrices = List(drmIndicatorsAtA) - - // Now look at cross-co-occurrences - for (drmBRaw <- drmBs) { - // Down-sample and pin other interaction matrix - val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() - - // Compute & broadcast the number of interactions per thing in B - val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) - - // Compute cross-co-occurrence matrix B'A - // pferrel: yikes, this is the wrong order, a big change! so you know who to blame - // used to be val drmBtA = drmB.t %*% drmA, which is the wrong order - val drmAtB = drmA.t %*% drmB - - val drmIndicatorsAtB = computeIndicators(drmAtB, numUsers, maxInterestingItemsPerThing, - bcastInteractionsPerItemA, bcastInteractionsPerThingB) - - indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB - - drmB.uncache() - } - - // Unpin downsampled interaction matrix - drmA.uncache() - - // Return list of indicator matrices - indicatorMatrices - } - - /** - * Compute loglikelihood ratio - * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details - **/ - def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, - numInteractionsWithAandB: Long, numInteractions: Long) = { - - val k11 = numInteractionsWithAandB - val k12 = numInteractionsWithA - numInteractionsWithAandB - val k21 = numInteractionsWithB - numInteractionsWithAandB - val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB - - LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) - - } - - def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, - bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], - crossCooccurrence: Boolean = true) = { - drmBtA.mapBlock() { - case (keys, block) => - - val llrBlock = block.like() - val numInteractionsB: Vector = bcastNumInteractionsB - val numInteractionsA: Vector = bcastNumInteractionsA - - for (index <- 0 until keys.size) { - - val thingB = keys(index) - - // PriorityQueue to select the top-k items - val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) - - block(index, ::).nonZeroes().foreach { elem => - val thingA = elem.index - val cooccurrences = elem.get - - // exclude co-occurrences of the item with itself - if (crossCooccurrence || thingB != thingA) { - // Compute loglikelihood ratio - val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, - cooccurrences.toLong, numUsers) - - val candidate = thingA -> llr - - // matches legacy hadoop code and maps values to range (0..1) - // val tLLR = 1.0 - (1.0 / (1.0 + llr)) - //val candidate = thingA -> tLLR - - // Enqueue item with score, if belonging to the top-k - if (topItemsPerThing.size < maxInterestingItemsPerThing) { - topItemsPerThing.enqueue(candidate) - } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { - topItemsPerThing.dequeue() - topItemsPerThing.enqueue(candidate) - } - } - } - - // Add top-k interesting items to the output matrix - topItemsPerThing.dequeueAll.foreach { - case (otherThing, llrScore) => - llrBlock(index, otherThing) = llrScore - } - } - - keys -> llrBlock - } - } - - /** - * Selectively downsample users and things with an anomalous amount of interactions, inspired by - * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java - * - * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not - */ - def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { - - implicit val distributedContext = drmM.context - - // Pin raw interaction matrix - val drmI = drmM.checkpoint() - - // Broadcast vector containing the number of interactions with each thing - val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) - - val downSampledDrmI = drmI.mapBlock() { - case (keys, block) => - val numInteractions: Vector = bcastNumInteractions - - // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures - val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) - - val downsampledBlock = block.like() - - // Downsample the interaction vector of each user - for (userIndex <- 0 until keys.size) { - - val interactionsOfUser = block(userIndex, ::) - - val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements() - - val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser - - interactionsOfUser.nonZeroes().foreach { elem => - val numInteractionsWithThing = numInteractions(elem.index) - val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing - - if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { - // We ignore the original interaction value and create a binary 0-1 matrix - // as we only consider whether interactions happened or did not happen - downsampledBlock(userIndex, elem.index) = 1 - } - } - } - - keys -> downsampledBlock - } - - // Unpin raw interaction matrix - drmI.uncache() - - downSampledDrmI - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala new file mode 100644 index 0000000..90d7559 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function.{VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object SimilarityAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + /** Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... + * and returns a list of indicator and cross-indicator matrices + * @param drmARaw Primary interaction matrix + * @param randomSeed when kept to a constant will make repeatable downsampling + * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 + * @param maxNumInteractions max number of interactions after downsampling, default: 500 + * @return + * */ + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) + + // Compute cross-co-occurrence matrix A'B + val drmAtB = drmA.t %*% drmB + + val drmIndicatorsAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerItemA, bcastInteractionsPerThingB) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsAtB + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows + * @param drmARaw Primary interaction matrix + * @param randomSeed when kept to a constant will make repeatable downsampling + * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 + * @param maxNumInteractions max number of interactions after downsampling, default: 500 + * @return + * */ + def rowSimilarity(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingSimilaritiesPerRow: Int = 50, + maxNumInteractions: Int = 500): DrmLike[Int] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num columns, which equals the maximum number of interactions per item + val numCols = drmA.ncol + + // Compute & broadcast the number of interactions per row in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow) + + // Compute row similarity cooccurrence matrix AA' + val drmAAt = drmA %*% drmA.t + + // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix + val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + drmSimilaritiesAAt + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + + } + + def computeSimilarities(drm: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drm.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + + val candidate = thingA -> llr + + // matches legacy hadoop code and maps values to range (0..1) + // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr)) + // val candidate = thingA -> normailizedLLR + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample rows and items with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + * @param drmM matrix to downsample + * @param seed random number generator seed, keep to a constant if repeatability is neccessary + * @param maxNumInteractions number of elements in a row of the returned matrix + * @return + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each row + for (rowIndex <- 0 until keys.size) { + + val interactionsInRow = block(rowIndex, ::) + + val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements() + + val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow + + interactionsInRow.nonZeroes().foreach { elem => + val numInteractionsWithThing = numInteractions(elem.index) + val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing + + if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) { + // We ignore the original interaction value and create a binary 0-1 matrix + // as we only consider whether interactions happened or did not happen + downsampledBlock(rowIndex, elem.index) = 1 + } + } + } + + keys -> downsampledBlock + } + + // Unpin raw interaction matrix + drmI.uncache() + + downSampledDrmI + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala index 026ab75..d8d04e2 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala @@ -90,7 +90,16 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { // Collect block-wise rowsums and output them as one-column matrix. keys -> dense(block.rowSums).t } - .collect(::, 0) + .collect(::, 0) + } + + /** Counts the non-zeros elements in each row returning a vector of the counts */ + def numNonZeroElementsPerRow(): Vector = { + drm.mapBlock(ncol = 1) { case (keys, block) => + // Collect block-wise row non-zero counts and output them as a one-column matrix. + keys -> dense(block.numNonZeroElementsPerRow).t + } + .collect(::, 0) } /** Row means */ http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala index d5ac026..910035f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala @@ -191,7 +191,10 @@ class MatrixOps(val m: Matrix) { /* Diagonal assignment */ def diagv_=(that: Double) = diagv := that + /* Row and Column non-zero element counts */ def numNonZeroElementsPerColumn() = m.aggregateColumns(vectorCountNonZeroElementsFunc) + + def numNonZeroElementsPerRow() = m.aggregateRows(vectorCountNonZeroElementsFunc) } object MatrixOps { http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala index 5be6ca8..d7b22d9 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala @@ -120,10 +120,11 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { a.colMeans() should equal(dvec(2.5, 3.5, 4.5)) a.rowMeans() should equal(dvec(3, 4)) a.numNonZeroElementsPerColumn() should equal(dvec(2,2,2)) + a.numNonZeroElementsPerRow() should equal(dvec(3,3)) } - test("numNonZeroElementsPerColumn") { + test("numNonZeroElementsPerColumn and Row") { val a = dense( (2, 3, 4), (3, 4, 5), @@ -132,6 +133,7 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { ) a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4)) + a.numNonZeroElementsPerRow() should equal(dvec(3,3,2,1)) } test("Vector Assignment performance") { http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 71d3944..2f79377 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -157,6 +157,27 @@ </executions> </plugin> + <!-- create job jar to include CLI driver deps--> + <!-- leave this in even though there are no hadoop mapreduce jobs in this module --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>job</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/job.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> </build> http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala index 654f116..f48e9ed 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/FileSysUtils.scala @@ -21,25 +21,24 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} /** - * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor. - * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]] - * - * @param pathURI Where to start looking for inFiles, only HDFS is currently - * supported. The pathURI may be a list of comma delimited URIs like those supported - * by Spark - * @param filePattern regex that must match the entire filename to have the file returned - * @param recursive true traverses the filesystem recursively - */ + * Returns a [[java.lang.String]]comma delimited list of URIs discovered based on parameters in the constructor. + * The String is formatted to be input into [[org.apache.spark.SparkContext.textFile()]] + * + * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs + * @param filePattern regex that must match the entire filename to have the file returned + * @param recursive true traverses the filesystem recursively, default = false + */ -case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive: Boolean = false) { +case class FileSysUtils(pathURI: String, filePattern: String = "", recursive: Boolean = false) { val conf = new Configuration() val fs = FileSystem.get(conf) - /** returns a string of comma delimited URIs matching the filePattern */ +/** Returns a string of comma delimited URIs matching the filePattern + * When pattern matching dirs are never returned, only traversed. */ def uris :String = { - if(recursive){ - val pathURIs = pathURI.split(",") + if (!filePattern.isEmpty){ // have file pattern so + val pathURIs = pathURI.split(",") var files = "" for ( uri <- pathURIs ){ files = findFiles(uri, filePattern, files) @@ -51,21 +50,27 @@ case class FileSysUtils(pathURI: String, filePattern: String = ".*", recursive: } } - /** find matching files in the dir, recursively call self when another directory is found */ - def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = { - val fileStatuses: Array[FileStatus] = fs.listStatus (new Path(dir)) +/** Find matching files in the dir, recursively call self when another directory is found + * Only files are matched, directories are traversed but never return a match */ + private def findFiles(dir: String, filePattern :String = ".*", files : String = ""): String = { + val seed = fs.getFileStatus(new Path(dir)) var f :String = files - for (fileStatus <- fileStatuses ){ - if (fileStatus.getPath().getName().matches(filePattern) - && !fileStatus.isDir){// found a file - if (fileStatus.getLen() != 0) { - // file is not empty - f = f + fileStatus.getPath.toUri.toString + "," + + if (seed.isDir) { + val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) + for (fileStatus <- fileStatuses) { + if (fileStatus.getPath().getName().matches(filePattern) + && !fileStatus.isDir) { + // found a file + if (fileStatus.getLen() != 0) { + // file is not empty + f = f + fileStatus.getPath.toUri.toString + "," + } + } else if (fileStatus.isDir && recursive) { + f = findFiles(fileStatus.getPath.toString, filePattern, f) } - }else if (fileStatus.isDir){ - f = findFiles(fileStatus.getPath.toString, filePattern, f) } - } + }else{ f = dir }// was a filename not dir f } } http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala index 41622a8..99f98f5 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/IndexedDataset.scala @@ -18,7 +18,7 @@ package org.apache.mahout.drivers import com.google.common.collect.BiMap -import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.math.drm.{DrmLike, CheckpointedDrm} import org.apache.mahout.sparkbindings.drm.CheckpointedDrmSpark import org.apache.mahout.sparkbindings._ @@ -61,13 +61,16 @@ case class IndexedDataset(var matrix: CheckpointedDrm[Int], rowIDs: BiMap[String val newMatrix = drmWrap[Int](drmRdd, n, ncol) new IndexedDataset(newMatrix, rowIDs, columnIDs) } + } /** - * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary constructor for - * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like [[org.apache.mahout.drivers.IndexedDatasetStore]] + * Companion object for the case class [[org.apache.mahout.drivers.IndexedDataset]] primarily used to get a secondary + * constructor for + * making one [[org.apache.mahout.drivers.IndexedDataset]] from another. Used when you have a factory like + * [[org.apache.mahout.drivers.Reader]] * {{{ - * val indexedDataset = IndexedDataset(indexedDatasetReader.readTuplesFrom(source)) + * val indexedDataset = IndexedDataset(indexedDatasetReader.readElementsFrom(source)) * }}} */ http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 460106f..b05b55d 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -17,7 +17,7 @@ package org.apache.mahout.drivers -import org.apache.mahout.math.cf.CooccurrenceAnalysis +import org.apache.mahout.math.cf.SimilarityAnalysis import scala.collection.immutable.HashMap /** @@ -25,7 +25,7 @@ import scala.collection.immutable.HashMap * Reads text lines * that contain (row id, column id, ...). The IDs are user specified strings which will be * preserved in the - * output. The individual tuples will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]] + * output. The individual elements will be accumulated into a matrix and [[org.apache.mahout.cf.CooccurrenceAnalysis.cooccurrences( )]] * will be used to calculate row-wise self-similarity, or when using filters or two inputs, will generate two * matrices and calculate both the self similarity of the primary matrix and the row-wise * similarity of the primary @@ -34,7 +34,7 @@ import scala.collection.immutable.HashMap * The options allow flexible control of the input schema, file discovery, output schema, and control of * algorithm parameters. * To get help run {{{mahout spark-itemsimilarity}}} for a full explanation of options. To process simple - * tuples of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, + * elements of text delimited values (userID,itemID) with or without a strengths and with a separator of tab, comma, or space, * you can specify only the input and output file and directory--all else will default to the correct values. * Each output line will contain the Item ID and similar items sorted by LLR strength descending. * @note To use with a Spark cluster see the --master option, if you run out of heap space check @@ -47,10 +47,6 @@ object ItemSimilarityDriver extends MahoutDriver { "maxSimilaritiesPerItem" -> 100, "appName" -> "ItemSimilarityDriver") - // build options from some stardard CLI param groups - // Note: always put the driver specific options at the last so the can override and previous options! - private var options: Map[String, Any] = null - private var reader1: TextDelimitedIndexedDatasetReader = _ private var reader2: TextDelimitedIndexedDatasetReader = _ private var writer: TextDelimitedIndexedDatasetWriter = _ @@ -60,17 +56,15 @@ object ItemSimilarityDriver extends MahoutDriver { * @param args Command line args, if empty a help message is printed. */ override def main(args: Array[String]): Unit = { - options = MahoutOptionParser.GenericOptions ++ MahoutOptionParser.SparkOptions ++ - MahoutOptionParser.FileIOOptions ++ MahoutOptionParser.TextDelimitedTuplesOptions ++ - MahoutOptionParser.TextDelimitedDRMOptions ++ ItemSimilarityOptions - val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") { - head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT") + parser = new MahoutOptionParser(programName = "spark-itemsimilarity") { + head("spark-itemsimilarity", "Mahout 1.0") //Input output options, non-driver specific - parseIOOptions + parseIOOptions(numInputs = 2) //Algorithm control options--driver specific + opts = opts ++ ItemSimilarityOptions note("\nAlgorithm control options:") opt[Int]("maxPrefs") abbr ("mppu") action { (x, options) => options + ("maxPrefs" -> x) @@ -79,14 +73,11 @@ object ItemSimilarityDriver extends MahoutDriver { if (x > 0) success else failure("Option --maxPrefs must be > 0") } -/** not implemented in CooccurrenceAnalysis.cooccurrence - opt[Int]("minPrefs") abbr ("mp") action { (x, options) => - options.put("minPrefs", x) - options - } text ("Ignore users with less preferences than this (optional). Default: 1") validate { x => - if (x > 0) success else failure("Option --minPrefs must be > 0") - } -*/ + /** not implemented in SimilarityAnalysis.cooccurrence + * threshold, and minPrefs + * todo: replacing the threshold with some % of the best values and/or a + * confidence measure expressed in standard deviations would be nice. + */ opt[Int]('m', "maxSimilaritiesPerItem") action { (x, options) => options + ("maxSimilaritiesPerItem" -> x) @@ -99,7 +90,7 @@ object ItemSimilarityDriver extends MahoutDriver { note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.") //Input text format - parseInputSchemaOptions + parseElementInputSchemaOptions //How to search for input parseFileDiscoveryOptions @@ -116,14 +107,14 @@ object ItemSimilarityDriver extends MahoutDriver { help("help") abbr ("h") text ("prints this usage text\n") } - parser.parse(args, options) map { opts => - options = opts + parser.parse(args, parser.opts) map { opts => + parser.opts = opts process } } - override def start(masterUrl: String = options("master").asInstanceOf[String], - appName: String = options("appName").asInstanceOf[String]): + override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], + appName: String = parser.opts("appName").asInstanceOf[String]): Unit = { // todo: the HashBiMap used in the TextDelimited Reader is hard coded into @@ -131,31 +122,31 @@ object ItemSimilarityDriver extends MahoutDriver { // will be only spcific to this job. sparkConf.set("spark.kryo.referenceTracking", "false") .set("spark.kryoserializer.buffer.mb", "200") - .set("spark.executor.memory", options("sparkExecutorMem").asInstanceOf[String]) + .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) super.start(masterUrl, appName) - val readSchema1 = new Schema("delim" -> options("inDelim").asInstanceOf[String], - "filter" -> options("filter1").asInstanceOf[String], - "rowIDPosition" -> options("rowIDPosition").asInstanceOf[Int], - "columnIDPosition" -> options("itemIDPosition").asInstanceOf[Int], - "filterPosition" -> options("filterPosition").asInstanceOf[Int]) + val readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String], + "filter" -> parser.opts("filter1").asInstanceOf[String], + "rowIDPosition" -> parser.opts("rowIDPosition").asInstanceOf[Int], + "columnIDPosition" -> parser.opts("itemIDPosition").asInstanceOf[Int], + "filterPosition" -> parser.opts("filterPosition").asInstanceOf[Int]) reader1 = new TextDelimitedIndexedDatasetReader(readSchema1) - if ((options("filterPosition").asInstanceOf[Int] != -1 && options("filter2").asInstanceOf[String] != null) - || (options("input2").asInstanceOf[String] != null && !options("input2").asInstanceOf[String].isEmpty )){ + if ((parser.opts("filterPosition").asInstanceOf[Int] != -1 && parser.opts("filter2").asInstanceOf[String] != null) + || (parser.opts("input2").asInstanceOf[String] != null && !parser.opts("input2").asInstanceOf[String].isEmpty )){ // only need to change the filter used compared to readSchema1 - val readSchema2 = new Schema(readSchema1) += ("filter" -> options("filter2").asInstanceOf[String]) + val readSchema2 = new Schema(readSchema1) += ("filter" -> parser.opts("filter2").asInstanceOf[String]) reader2 = new TextDelimitedIndexedDatasetReader(readSchema2) } writeSchema = new Schema( - "rowKeyDelim" -> options("rowKeyDelim").asInstanceOf[String], - "columnIdStrengthDelim" -> options("columnIdStrengthDelim").asInstanceOf[String], - "omitScore" -> options("omitStrength").asInstanceOf[Boolean], - "tupleDelim" -> options("tupleDelim").asInstanceOf[String]) + "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String], + "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String], + "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean], + "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String]) writer = new TextDelimitedIndexedDatasetWriter(writeSchema) @@ -163,19 +154,19 @@ object ItemSimilarityDriver extends MahoutDriver { private def readIndexedDatasets: Array[IndexedDataset] = { - val inFiles = FileSysUtils(options("input").asInstanceOf[String], options("filenamePattern").asInstanceOf[String], - options("recursive").asInstanceOf[Boolean]).uris - val inFiles2 = if (options("input2") == null || options("input2").asInstanceOf[String].isEmpty) "" - else FileSysUtils(options("input2").asInstanceOf[String], options("filenamePattern").asInstanceOf[String], - options("recursive").asInstanceOf[Boolean]).uris + val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], + parser.opts("recursive").asInstanceOf[Boolean]).uris + val inFiles2 = if (parser.opts("input2") == null || parser.opts("input2").asInstanceOf[String].isEmpty) "" + else FileSysUtils(parser.opts("input2").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], + parser.opts("recursive").asInstanceOf[Boolean]).uris if (inFiles.isEmpty) { Array() } else { - val datasetA = IndexedDataset(reader1.readTuplesFrom(inFiles)) - if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetA, - options("output").asInstanceOf[String] + "../input-datasets/primary-interactions") + val datasetA = IndexedDataset(reader1.readElementsFrom(inFiles)) + if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetA, + parser.opts("output").asInstanceOf[String] + "../input-datasets/primary-interactions") // The case of readng B can be a bit tricky when the exact same row IDs don't exist for A and B // Here we assume there is one row ID space for all interactions. To do this we calculate the @@ -185,15 +176,15 @@ object ItemSimilarityDriver extends MahoutDriver { // be supported (and are at least on Spark) or the row cardinality fix will not work. val datasetB = if (!inFiles2.isEmpty) { // get cross-cooccurrence interactions from separate files - val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles2, existingRowIDs = datasetA.rowIDs)) + val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles2, existingRowIDs = datasetA.rowIDs)) datasetB - } else if (options("filterPosition").asInstanceOf[Int] != -1 - && options("filter2").asInstanceOf[String] != null) { + } else if (parser.opts("filterPosition").asInstanceOf[Int] != -1 + && parser.opts("filter2").asInstanceOf[String] != null) { // get cross-cooccurrences interactions by using two filters on a single set of files - val datasetB = IndexedDataset(reader2.readTuplesFrom(inFiles, existingRowIDs = datasetA.rowIDs)) + val datasetB = IndexedDataset(reader2.readElementsFrom(inFiles, existingRowIDs = datasetA.rowIDs)) datasetB @@ -201,18 +192,18 @@ object ItemSimilarityDriver extends MahoutDriver { null.asInstanceOf[IndexedDataset] } if (datasetB != null.asInstanceOf[IndexedDataset]) { // do AtB calc - // true row cardinality is the size of the row id index, which was calculated from all rows of A and B - val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality + // true row cardinality is the size of the row id index, which was calculated from all rows of A and B + val rowCardinality = datasetB.rowIDs.size() // the authoritative row cardinality // todo: how expensive is nrow? We could make assumptions about .rowIds that don't rely on // its calculation val returnedA = if (rowCardinality != datasetA.matrix.nrow) datasetA.newRowCardinality(rowCardinality) - else datasetA // this guarantees matching cardinality + else datasetA // this guarantees matching cardinality val returnedB = if (rowCardinality != datasetB.matrix.nrow) datasetB.newRowCardinality(rowCardinality) - else datasetB // this guarantees matching cardinality + else datasetB // this guarantees matching cardinality - if (options("writeAllDatasets").asInstanceOf[Boolean]) writer.writeDRMTo(datasetB, options("output") + "../input-datasets/secondary-interactions") + if (parser.opts("writeAllDatasets").asInstanceOf[Boolean]) writer.writeTo(datasetB, parser.opts("output") + "../input-datasets/secondary-interactions") Array(returnedA, returnedB) } else Array(datasetA) @@ -227,25 +218,25 @@ object ItemSimilarityDriver extends MahoutDriver { // todo: allow more than one cross-similarity matrix? val indicatorMatrices = { if (indexedDatasets.length > 1) { - CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int], - options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int], - Array(indexedDatasets(1).matrix)) + SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int], + parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int], + Array(indexedDatasets(1).matrix)) } else { - CooccurrenceAnalysis.cooccurrences(indexedDatasets(0).matrix, options("randomSeed").asInstanceOf[Int], - options("maxSimilaritiesPerItem").asInstanceOf[Int], options("maxPrefs").asInstanceOf[Int]) + SimilarityAnalysis.cooccurrences(indexedDatasets(0).matrix, parser.opts("randomSeed").asInstanceOf[Int], + parser.opts("maxSimilaritiesPerItem").asInstanceOf[Int], parser.opts("maxPrefs").asInstanceOf[Int]) } } // an alternative is to create a version of IndexedDataset that knows how to write itself val selfIndicatorDataset = new IndexedDatasetTextDelimitedWriteable(indicatorMatrices(0), indexedDatasets(0).columnIDs, indexedDatasets(0).columnIDs, writeSchema) - selfIndicatorDataset.writeTo(options("output").asInstanceOf[String] + "indicator-matrix") + selfIndicatorDataset.writeTo(dest = parser.opts("output").asInstanceOf[String] + "indicator-matrix") // todo: would be nice to support more than one cross-similarity indicator if (indexedDatasets.length > 1) { val crossIndicatorDataset = new IndexedDataset(indicatorMatrices(1), indexedDatasets(0).columnIDs, indexedDatasets(1).columnIDs) // cross similarity - writer.writeDRMTo(crossIndicatorDataset, options("output").asInstanceOf[String] + "cross-indicator-matrix") + writer.writeTo(crossIndicatorDataset, parser.opts("output").asInstanceOf[String] + "cross-indicator-matrix") } http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala index e92ed37..6ea7c8b 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -26,43 +26,55 @@ import scala.collection.immutable /** Extend this class to create a Mahout CLI driver. Minimally you must override process and main. * Also define a Map of options for the command line parser. The following template may help: * {{{ - * object SomeDriver extends MahoutDriver { - * // build options from some stardard CLI param groups - * // Note: always put the driver specific options at the last so the can override and previous options! - * private var options = GenericOptions ++ SparkOptions ++ FileIOOptions ++ TextDelimitedTuplesOptions ++ - * TextDelimitedDRMOptions ++ ItemSimilarityOptions + * object SomeDriver extends MahoutDriver { * - * override def main(args: Array[String]): Unit = { - * val parser = new MahoutOptionParser(programName = "spark-itemsimilarity") { - * head("spark-itemsimilarity", "Mahout 1.0-SNAPSHOT") + * // define only the options specific to this driver, inherit the generic ones + * private final val SomeOptions = HashMap[String, Any]( + * "maxThings" -> 500, + * "minThings" -> 100, + * "appName" -> "SomeDriver") * - * //Several standard option groups are usually non-driver specific so use the MahoutOptionParser methods - * parseGenericOptions - * ... - * } - * parser.parse(args, options) map { opts => - * options = opts - * process - * } - * } + * override def main(args: Array[String]): Unit = { + * + * + * val parser = new MahoutOptionParser(programName = "shortname") { + * head("somedriver", "Mahout 1.0-SNAPSHOT") * - * override def process: Unit = { - * start() - * //don't just stand there do something - * stop + * // Input output options, non-driver specific + * parseIOOptions + * + * // Algorithm specific options + * // Add in the new options + * opts = opts ++ SomeOptions + * note("\nAlgorithm control options:") + * opt[Int]("maxThings") abbr ("mt") action { (x, options) => + * options + ("maxThings" -> x) ... + * } + * parser.parse(args, parser.opts) map { opts => + * parser.opts = opts + * process * } * } + * + * override def process: Unit = { + * start() + * // do the work here + * stop + * } + * * }}} */ abstract class MahoutDriver { - implicit var mc: DistributedContext = _ - implicit var sparkConf = new SparkConf() - var _useExistingContext: Boolean = false + implicit protected var mc: DistributedContext = _ + implicit protected var sparkConf = new SparkConf() + protected var parser: MahoutOptionParser = _ + + var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite /** Creates a Spark context to run the job inside. - * Creates a Spark context to run the job inside. Override to set the SparkConf values specific to the job, + * Override to set the SparkConf values specific to the job, * these must be set before the context is created. * @param masterUrl Spark master URL * @param appName Name to display in Spark UI http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala index 3aada78..6908bd2 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutOptionParser.scala @@ -32,13 +32,15 @@ object MahoutOptionParser { "appName" -> "Generic Spark App, Change this.") final val FileIOOptions = immutable.HashMap[String, Any]( - "recursive" -> false, "input" -> null.asInstanceOf[String], "input2" -> null.asInstanceOf[String], - "output" -> null.asInstanceOf[String], + "output" -> null.asInstanceOf[String]) + + final val FileDiscoveryOptions = immutable.HashMap[String, Any]( + "recursive" -> false, "filenamePattern" -> "^part-.*") - final val TextDelimitedTuplesOptions = immutable.HashMap[String, Any]( + final val TextDelimitedElementsOptions = immutable.HashMap[String, Any]( "rowIDPosition" -> 0, "itemIDPosition" -> 1, "filterPosition" -> -1, @@ -49,7 +51,7 @@ object MahoutOptionParser { final val TextDelimitedDRMOptions = immutable.HashMap[String, Any]( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> " ", + "elementDelim" -> " ", "omitStrength" -> false) } /** Defines oft-repeated options and their parsing. Provides the option groups and parsing helper methods to @@ -57,17 +59,25 @@ object MahoutOptionParser { * @param programName Name displayed in help message, the name by which the driver is invoked. * */ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, Any]](programName: String) { + + // build options from some stardard CLI param groups + // Note: always put the driver specific options at the last so the can override and previous options! + var opts = Map.empty[String, Any] + override def showUsageOnError = true - def parseIOOptions = { + def parseIOOptions(numInputs: Int = 1) = { + opts = opts ++ MahoutOptionParser.FileIOOptions note("Input, output options") opt[String]('i', "input") required() action { (x, options) => options + ("input" -> x) } text ("Input path, may be a filename, directory name, or comma delimited list of HDFS supported URIs (required)") - opt[String]("input2") abbr ("i2") action { (x, options) => - options + ("input2" -> x) - } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.") + if (numInputs == 2) { + opt[String]("input2") abbr ("i2") action { (x, options) => + options + ("input2" -> x) + } text ("Secondary input path for cross-similarity calculation, same restrictions as \"--input\" (optional). Default: empty.") + } opt[String]('o', "output") required() action { (x, options) => // todo: check to see if HDFS allows MS-Windows backslashes locally? @@ -81,6 +91,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } def parseSparkOptions = { + opts = opts ++ MahoutOptionParser.SparkOptions note("\nSpark config options:") opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => @@ -94,7 +105,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } def parseGenericOptions = { - note("\nGeneral config options:") + opts = opts ++ MahoutOptionParser.GenericOptions opt[Int]("randomSeed") abbr ("rs") action { (x, options) => options + ("randomSeed" -> x) } validate { x => @@ -107,9 +118,10 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A }//Hidden option, though a user might want this. } - def parseInputSchemaOptions{ - //Input text file schema--not driver specific but input data specific, tuples input, + def parseElementInputSchemaOptions{ + //Input text file schema--not driver specific but input data specific, elements input, // not drms + opts = opts ++ MahoutOptionParser.TextDelimitedElementsOptions note("\nInput text file schema options:") opt[String]("inDelim") abbr ("id") text ("Input delimiter character (optional). Default: \"[,\\t]\"") action { (x, options) => options + ("inDelim" -> x) @@ -162,6 +174,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A def parseFileDiscoveryOptions = { //File finding strategy--not driver specific + opts = opts ++ MahoutOptionParser.FileDiscoveryOptions note("\nFile discovery options:") opt[Unit]('r', "recursive") action { (_, options) => options + ("recursive" -> true) @@ -174,6 +187,7 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A } def parseDrmFormatOptions = { + opts = opts ++ MahoutOptionParser.TextDelimitedDRMOptions note("\nOutput text file schema options:") opt[String]("rowKeyDelim") abbr ("rd") action { (x, options) => options + ("rowKeyDelim" -> x) @@ -183,9 +197,9 @@ class MahoutOptionParser(programName: String) extends OptionParser[Map[String, A options + ("columnIdStrengthDelim" -> x) } text ("Separates column IDs from their values in the vector values list (optional). Default: \":\"") - opt[String]("tupleDelim") abbr ("td") action { (x, options) => - options + ("tupleDelim" -> x) - } text ("Separates vector tuple values in the values list (optional). Default: \" \"") + opt[String]("elementDelim") abbr ("td") action { (x, options) => + options + ("elementDelim" -> x) + } text ("Separates vector element values in the values list (optional). Default: \" \"") opt[Unit]("omitStrength") abbr ("os") action { (_, options) => options + ("omitStrength" -> true) http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala index e2bb49c..6351e45 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala @@ -20,7 +20,7 @@ package org.apache.mahout.drivers import com.google.common.collect.{HashBiMap, BiMap} import org.apache.mahout.math.drm.DistributedContext -/** Reader trait is abstract in the sense that the tupleReader function must be defined by an extending trait, which also defines the type to be read. +/** Reader trait is abstract in the sense that the elementReader function must be defined by an extending trait, which also defines the type to be read. * @tparam T type of object read. */ trait Reader[T]{ @@ -28,16 +28,27 @@ trait Reader[T]{ val mc: DistributedContext val readSchema: Schema - protected def tupleReader( + protected def elementReader( mc: DistributedContext, readSchema: Schema, source: String, existingRowIDs: BiMap[String, Int]): T - def readTuplesFrom( + protected def drmReader( + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: BiMap[String, Int]): T + + def readElementsFrom( + source: String, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = + elementReader(mc, readSchema, source, existingRowIDs) + + def readDRMFrom( source: String, existingRowIDs: BiMap[String, Int] = HashBiMap.create()): T = - tupleReader(mc, readSchema, source, existingRowIDs) + drmReader(mc, readSchema, source, existingRowIDs) } /** Writer trait is abstract in the sense that the writer method must be supplied by an extending trait, which also defines the type to be written. @@ -51,5 +62,5 @@ trait Writer[T]{ protected def writer(mc: DistributedContext, writeSchema: Schema, dest: String, collection: T, sort: Boolean): Unit - def writeDRMTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort) + def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, collection, sort) } http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala new file mode 100644 index 0000000..920c32b --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -0,0 +1,159 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package org.apache.mahout.drivers + +import org.apache.mahout.math.cf.SimilarityAnalysis +import scala.collection.immutable.HashMap + +/** + * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]. + * Reads a text delimited file containing a Mahout DRM of the form + * (row id, column id: strength, ...). The IDs are user specified strings which will be + * preserved in the + * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]] + * will be used to calculate row-wise similarity using log-likelihood + * The options allow control of the input schema, file discovery, output schema, and control of + * algorithm parameters. + * To get help run {{{mahout spark-rowsimilarity}}} for a full explanation of options. The default + * values for formatting will read (rowID<tab>columnID1:strength1<space>columnID2:strength2....) + * and write (rowID<tab>rowID1:strength1<space>rowID2:strength2....) + * Each output line will contain a row ID and similar columns sorted by LLR strength descending. + * @note To use with a Spark cluster see the --master option, if you run out of heap space check + * the --sparkExecutorMemory option. + */ +object RowSimilarityDriver extends MahoutDriver { + // define only the options specific to RowSimilarity + private final val RowSimilarityOptions = HashMap[String, Any]( + "maxObservations" -> 500, + "maxSimilaritiesPerRow" -> 100, + "appName" -> "RowSimilarityDriver") + + private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _ + private var readWriteSchema: Schema = _ + + /** + * @param args Command line args, if empty a help message is printed. + */ + override def main(args: Array[String]): Unit = { + + parser = new MahoutOptionParser(programName = "spark-rowsimilarity") { + head("spark-rowsimilarity", "Mahout 1.0") + + //Input output options, non-driver specific + parseIOOptions() + + //Algorithm control options--driver specific + opts = opts ++ RowSimilarityOptions + + note("\nAlgorithm control options:") + opt[Int]("maxObservations") abbr ("mo") action { (x, options) => + options + ("maxObservations" -> x) + } text ("Max number of observations to consider per row (optional). Default: " + + RowSimilarityOptions("maxObservations")) validate { x => + if (x > 0) success else failure("Option --maxObservations must be > 0") + } + + opt[Int]('m', "maxSimilaritiesPerRow") action { (x, options) => + options + ("maxSimilaritiesPerRow" -> x) + } text ("Limit the number of similarities per item to this number (optional). Default: " + + RowSimilarityOptions("maxSimilaritiesPerRow")) validate { x => + if (x > 0) success else failure("Option --maxSimilaritiesPerRow must be > 0") + } + + /** --threshold not implemented in SimilarityAnalysis.rowSimilarity + * todo: replacing the threshold with some % of the best values and/or a + * confidence measure expressed in standard deviations would be nice. + */ + + //Driver notes--driver specific + note("\nNote: Only the Log Likelihood Ratio (LLR) is supported as a similarity measure.") + + //Drm output schema--not driver specific, drm specific + parseDrmFormatOptions + + //How to search for input + parseFileDiscoveryOptions + + //Spark config options--not driver specific + parseSparkOptions + + //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI + parseGenericOptions + + help("help") abbr ("h") text ("prints this usage text\n") + + } + parser.parse(args, parser.opts) map { opts => + parser.opts = opts + process + } + } + + override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], + appName: String = parser.opts("appName").asInstanceOf[String]): + Unit = { + + // todo: the HashBiMap used in the TextDelimited Reader is hard coded into + // MahoutKryoRegistrator, it should be added to the register list here so it + // will be only spcific to this job. + sparkConf.set("spark.kryo.referenceTracking", "false") + .set("spark.kryoserializer.buffer.mb", "200") + .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + + super.start(masterUrl, appName) + + readWriteSchema = new Schema( + "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String], + "columnIdStrengthDelim" -> parser.opts("columnIdStrengthDelim").asInstanceOf[String], + "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean], + "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String]) + + readerWriter = new TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema) + + } + + private def readIndexedDataset: IndexedDataset = { + + val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], + parser.opts("recursive").asInstanceOf[Boolean]).uris + + if (inFiles.isEmpty) { + null.asInstanceOf[IndexedDataset] + } else { + + val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles)) + datasetA + } + } + + override def process: Unit = { + start() + + val indexedDataset = readIndexedDataset + + val rowSimilarityDrm = SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, parser.opts("randomSeed").asInstanceOf[Int], + parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int]) + + val rowSimilarityDataset = new IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm, + indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema) + rowSimilarityDataset.writeTo(dest = parser.opts("output").asInstanceOf[String]) + + stop + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala index edff92d..42b2658 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala @@ -41,26 +41,26 @@ class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { // These can be used to keep the text in and out fairly standard to Mahout, where an application specific // format is not required. -/** Simple default Schema for typical text delimited tuple file input - * This tells the reader to input tuples of the default (rowID<comma, tab, or space>columnID +/** Simple default Schema for typical text delimited element file input + * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID * <comma, tab, or space>here may be other ignored text...) */ -class DefaultTupleReadSchema extends Schema( - "delim" -> "[,\t ]", //comma, tab or space - "filter" -> "", - "rowIDPosition" -> 0, - "columnIDPosition" -> 1, - "filterPosition" -> -1) +class DefaultElementReadSchema extends Schema( + "delim" -> "[,\t ]", //comma, tab or space + "filter" -> "", + "rowIDPosition" -> 0, + "columnIDPosition" -> 1, + "filterPosition" -> -1) /** Default Schema for text delimited drm file output * This tells the writer to write a DRM of the default form: * (rowID<tab>columnID1:score1<space>columnID2:score2...) */ class DefaultDRMWriteSchema extends Schema( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "tupleDelim" -> " ", - "omitScore" -> false) + "rowKeyDelim" -> "\t", + "columnIdStrengthDelim" -> ":", + "elementDelim" -> " ", + "omitScore" -> false) /** Default Schema for typical text delimited drm file input * This tells the reader to input text lines of the form: @@ -69,9 +69,9 @@ class DefaultDRMWriteSchema extends Schema( class DefaultDRMReadSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> " ") + "elementDelim" -> " ") -/** Default Schema for reading a text delimited drm file where the score of any tuple is ignored, +/** Default Schema for reading a text delimited drm file where the score of any element is ignored, * all non-zeros are replaced with 1. * This tells the reader to input DRM lines of the form * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored. @@ -82,17 +82,17 @@ class DefaultDRMReadSchema extends Schema( class DRMReadBooleanSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> " ", + "elementDelim" -> " ", "omitScore" -> true) -/** Default Schema for typical text delimited drm file write where the score of a tuple is omitted. - * The presence of a tuple means the score = 1, the absence means a score of 0. +/** Default Schema for typical text delimited drm file write where the score of a element is omitted. + * The presence of a element means the score = 1, the absence means a score of 0. * This tells the writer to output DRM lines of the form * (rowID<tab>columnID1<space>columnID2...) */ class DRMWriteBooleanSchema extends Schema( "rowKeyDelim" -> "\t", "columnIdStrengthDelim" -> ":", - "tupleDelim" -> " ", + "elementDelim" -> " ", "omitScore" -> true) http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index 11d647b..53a36a5 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -27,14 +27,16 @@ import scala.collection.JavaConversions._ /** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]] */ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ - /** Read in text delimited tuples from all URIs in this comma delimited source String. + /** Read in text delimited elements from all URIs in the comma delimited source String and return + * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is + * no strength value in the element, assume it's presence means a strength of 1. * * @param mc context for the Spark job * @param readSchema describes the delimiters and positions of values in the text delimited file. * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] * @return */ - protected def tupleReader( + protected def elementReader( mc: DistributedContext, readSchema: Schema, source: String, @@ -53,7 +55,6 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ }) var columns = mc.textFile(source).map { line => line.split(delimiter) } - //val m = columns.collect // -1 means no filter in the input text, take them all if(filterPosition != -1) { @@ -113,9 +114,100 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ } } - // this creates a BiMap from an ID collection. The ID points to an ordinal int + /** Read in text delimited rows from all URIs in this comma delimited source String and return + * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is + * no strength value in the element, assume it's presence means a strength of 1. + * + * @param mc context for the Spark job + * @param readSchema describes the delimiters and positions of values in the text delimited file. + * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] + * @return + */ + protected def drmReader( + mc: DistributedContext, + readSchema: Schema, + source: String, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = { + try { + val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String] + val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String] + val elementDelim = readSchema("elementDelim").asInstanceOf[String] + // no need for omitScore since we can tell if there is a score and assume it is 1.0d if not specified + //val omitScore = readSchema("omitScore").asInstanceOf[Boolean] + + assert(!source.isEmpty, { + println(this.getClass.toString + ": has no files to read") + throw new IllegalArgumentException + }) + + var rows = mc.textFile(source).map { line => line.split(rowKeyDelim) } + + // get row and column IDs + val interactions = rows.map { row => + row(0) -> row(1)// rowID token -> string of column IDs+strengths + } + + interactions.cache() + interactions.collect() + + // create separate collections of rowID and columnID tokens + val rowIDs = interactions.map { case (rowID, _) => rowID }.distinct().collect() + + // the columns are in a TD string so separate them and get unique ones + val columnIDs = interactions.flatMap { case (_, columns) => columns + val elements = columns.split(elementDelim) + val colIDs = elements.map( elem => elem.split(columnIdStrengthDelim)(0) ) + colIDs + }.distinct().collect() + + val numRows = rowIDs.size + val numColumns = columnIDs.size + + // create BiMaps for bi-directional lookup of ID by either Mahout ID or external ID + // broadcast them for access in distributed processes, so they are not recalculated in every task. + val rowIDDictionary = asOrderedDictionary(existingRowIDs, rowIDs) + val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary) + + val columnIDDictionary = asOrderedDictionary(entries = columnIDs) + val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary) + + val indexedInteractions = + interactions.map { case (rowID, columns) => + val rowIndex = rowIDDictionary_bcast.value.get(rowID).get + + val elements = columns.split(elementDelim) + val row = new RandomAccessSparseVector(numColumns) + for (element <- elements) { + val id = element.split(columnIdStrengthDelim)(0) + val columnID = columnIDDictionary_bcast.value.get(id).get + val pair = element.split(columnIdStrengthDelim) + if (pair.size == 2)// there was a strength + row.setQuick(columnID,pair(1).toDouble) + else // no strength so set DRM value to 1.0d, this ignores 'omitScore', which is a write param + row.setQuick(columnID,1.0d) + } + rowIndex -> row + } + .asInstanceOf[DrmRdd[Int]] + + // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed + val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns) + + IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary) + + } catch { + case cce: ClassCastException => { + println(this.getClass.toString + ": Schema has illegal values") + throw cce + } + } + } + + // this creates a BiMap from an ID collection. The ID points to an ordinal int // which is used internal to Mahout as the row or column ID - // todo: this is a non-distributed process and the BiMap is a non-rdd based object--might be a scaling problem + // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a + // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit + // in-memory, the option is to put the dictionaries in rdds and do joins to translate IDs private def asOrderedDictionary(dictionary: BiMap[String, Int] = HashBiMap.create(), entries: Array[String]): BiMap[String, Int] = { var index = dictionary.size() // if a dictionary is supplied then add to the end based on the Mahout id 'index' for (entry <- entries) { @@ -130,7 +222,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - /** Read in text delimited tuples from all URIs in this comma delimited source String. + /** Read in text delimited elements from all URIs in this comma delimited source String. * * @param mc context for the Spark job * @param writeSchema describes the delimiters and positions of values in the output text delimited file. @@ -145,7 +237,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ try { val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = writeSchema("columnIdStrengthDelim").asInstanceOf[String] - val tupleDelim = writeSchema("tupleDelim").asInstanceOf[String] + val elementDelim = writeSchema("elementDelim").asInstanceOf[String] val omitScore = writeSchema("omitScore").asInstanceOf[Boolean] //instance vars must be put into locally scoped vals when put into closures that are //executed but Spark @@ -177,11 +269,11 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ // first get the external rowID token if (!vector.isEmpty){ var line: String = rowIDDictionary.inverse.get(rowID) + rowKeyDelim - // for the rest of the row, construct the vector contents of tuples (external column ID, strength value) + // for the rest of the row, construct the vector contents of elements (external column ID, strength value) for (item <- vector) { line += columnIDDictionary.inverse.get(item.getFirst) if (!omitScore) line += columnIdStrengthDelim + item.getSecond - line += tupleDelim + line += elementDelim } // drop the last delimiter, not needed to end the line line.dropRight(1) @@ -203,7 +295,7 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param readSchema describes the delimiters and position of values in the text delimited file to be read. * @param mc Spark context for reading files - * @note The source is supplied by Reader#readTuplesFrom . + * @note The source is supplied by Reader#readElementsFrom . * */ class TextDelimitedIndexedDatasetReader(val readSchema: Schema) (implicit val mc: DistributedContext) extends TDIndexedDatasetReader @@ -211,7 +303,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: Schema) /** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. * @param mc Spark context for reading files - * @note the destination is supplied by Writer#writeDRMTo trait method + * @note the destination is supplied by Writer#writeTo trait method * */ class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter @@ -224,7 +316,7 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS (implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter -/** A version of IndexedDataset that has it's own writeDRMTo method from a Writer trait. This is an alternative to creating +/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm(). * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables * are probably short lived in terms of lines of code so complexity may be moot. @@ -243,18 +335,20 @@ class IndexedDatasetTextDelimitedWriteable( (implicit val mc: DistributedContext) extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter { - def writeTo(dest: String): Unit = { - writeDRMTo(this, dest) + override def writeTo(collection: IndexedDataset = this, dest: String): Unit = { + super.writeTo(this, dest) } } /** - * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily used to get a secondary constructor for - * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]] - * {{{ - * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readTuplesFrom(source)) - * }}} - */ + * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily + * used to get a secondary constructor for + * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a + * factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]] + * {{{ + * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source)) + * }}} + */ object IndexedDatasetTextDelimitedWriteable { /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */ http://git-wip-us.apache.org/repos/asf/mahout/blob/149c9859/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index dedb279..54f33ef 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -62,7 +62,7 @@ object SparkEngine extends DistributedEngine { // Fold() doesn't work with kryo still. So work around it. .mapPartitions(iter => { val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) => - v.nonZeroes().foreach { elem => acc(elem.index) += 1} + v.nonZeroes().foreach { elem => acc(elem.index) += 1 } acc } Iterator(acc)
