Repository: mahout Updated Branches: refs/heads/master 4e6577d14 -> 666d314fb
http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala index 920c32b..f8abef3 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -17,15 +17,18 @@ package org.apache.mahout.drivers +import org.apache.mahout.common.HDFSPathSearch import org.apache.mahout.math.cf.SimilarityAnalysis +import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSRead} +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scala.collection.immutable.HashMap /** - * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]]. - * Reads a text delimited file containing a Mahout DRM of the form - * (row id, column id: strength, ...). The IDs are user specified strings which will be - * preserved in the - * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarity( )]] + * Command line interface for [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]]. + * Reads a text delimited file containing rows of a [[org.apache.mahout.math.indexeddataset.IndexedDataset]] + * with domain specific IDS of the form + * (row id, column id: strength, ...). The IDs will be preserved in the + * output. The rows define a matrix and [[org.apache.mahout.cf.SimilarityAnalysis.rowSimilarityIDSs( )]] * will be used to calculate row-wise similarity using log-likelihood * The options allow control of the input schema, file discovery, output schema, and control of * algorithm parameters. @@ -36,14 +39,13 @@ import scala.collection.immutable.HashMap * @note To use with a Spark cluster see the --master option, if you run out of heap space check * the --sparkExecutorMemory option. */ -object RowSimilarityDriver extends MahoutDriver { +object RowSimilarityDriver extends MahoutSparkDriver { // define only the options specific to RowSimilarity private final val RowSimilarityOptions = HashMap[String, Any]( "maxObservations" -> 500, "maxSimilaritiesPerRow" -> 100, "appName" -> "RowSimilarityDriver") - private var readerWriter: TextDelimitedIndexedDatasetReaderWriter = _ private var readWriteSchema: Schema = _ /** @@ -110,10 +112,13 @@ object RowSimilarityDriver extends MahoutDriver { // todo: the HashBiMap used in the TextDelimited Reader is hard coded into // MahoutKryoRegistrator, it should be added to the register list here so it - // will be only spcific to this job. + // will be only specific to this job. sparkConf.set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "200") - .set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + .set("spark.kryoserializer.buffer.mb", "200")// todo: should we take this out? + + if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") + sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + //else leave as set in Spark config super.start(masterUrl, appName) @@ -123,20 +128,18 @@ object RowSimilarityDriver extends MahoutDriver { "omitScore" -> parser.opts("omitStrength").asInstanceOf[Boolean], "elementDelim" -> parser.opts("elementDelim").asInstanceOf[String]) - readerWriter = new TextDelimitedIndexedDatasetReaderWriter(readWriteSchema, readWriteSchema) - } private def readIndexedDataset: IndexedDataset = { - val inFiles = FileSysUtils(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], + val inFiles = HDFSPathSearch(parser.opts("input").asInstanceOf[String], parser.opts("filenamePattern").asInstanceOf[String], parser.opts("recursive").asInstanceOf[Boolean]).uris if (inFiles.isEmpty) { null.asInstanceOf[IndexedDataset] } else { - val datasetA = IndexedDataset(readerWriter.readDRMFrom(inFiles)) + val datasetA = indexedDatasetDFSRead(inFiles, readWriteSchema) datasetA } } @@ -146,12 +149,12 @@ object RowSimilarityDriver extends MahoutDriver { val indexedDataset = readIndexedDataset - val rowSimilarityDrm = SimilarityAnalysis.rowSimilarity(indexedDataset.matrix, parser.opts("randomSeed").asInstanceOf[Int], - parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], parser.opts("maxObservations").asInstanceOf[Int]) + val rowSimilarityIDS = SimilarityAnalysis.rowSimilarityIDS(indexedDataset, + parser.opts("randomSeed").asInstanceOf[Int], + parser.opts("maxSimilaritiesPerRow").asInstanceOf[Int], + parser.opts("maxObservations").asInstanceOf[Int]) - val rowSimilarityDataset = new IndexedDatasetTextDelimitedWriteable(rowSimilarityDrm, - indexedDataset.rowIDs, indexedDataset.rowIDs, readWriteSchema) - rowSimilarityDataset.writeTo(dest = parser.opts("output").asInstanceOf[String]) + rowSimilarityIDS.dfsWrite(parser.opts("output").asInstanceOf[String], readWriteSchema) stop } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala b/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala deleted file mode 100644 index 92163be..0000000 --- a/spark/src/main/scala/org/apache/mahout/drivers/Schema.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.drivers - -import scala.collection.mutable -import scala.collection.mutable.HashMap - -/** Syntactic sugar for mutable.HashMap[String, Any] - * - * @param params list of mappings for instantiation {{{val mySchema = new Schema("one" -> 1, "two" -> "2", ...)}}} - */ -class Schema(params: Tuple2[String, Any]*) extends HashMap[String, Any] { - // note: this require a mutable HashMap, do we care? - this ++= params - - /** Constructor for copying an existing Schema - * - * @param schemaToClone return a copy of this Schema - */ - def this(schemaToClone: Schema){ - this() - this ++= schemaToClone - } -} - -// These can be used to keep the text in and out fairly standard to Mahout, where an application specific -// format is not required. - -/** Simple default Schema for typical text delimited element file input - * This tells the reader to input elements of the default (rowID<comma, tab, or space>columnID - * <comma, tab, or space>here may be other ignored text...) - */ -class DefaultElementReadSchema extends Schema( - "delim" -> "[,\t ]", //comma, tab or space - "filter" -> "", - "rowIDColumn" -> 0, - "columnIDPosition" -> 1, - "filterColumn" -> -1) - -/** Default Schema for text delimited drm file output - * This tells the writer to write a DRM of the default form: - * (rowID<tab>columnID1:score1<space>columnID2:score2...) - */ -class DefaultDRMWriteSchema extends Schema( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "elementDelim" -> " ", - "omitScore" -> false) - -/** Default Schema for typical text delimited drm file input - * This tells the reader to input text lines of the form: - * (rowID<tab>columnID1:score1,columnID2:score2,...) - */ -class DefaultDRMReadSchema extends Schema( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "elementDelim" -> " ") - -/** Default Schema for reading a text delimited drm file where the score of any element is ignored, - * all non-zeros are replaced with 1. - * This tells the reader to input DRM lines of the form - * (rowID<tab>columnID1:score1<space>columnID2:score2...) remember the score is ignored. - * Alternatively the format can be - * (rowID<tab>columnID1<space>columnID2 ...) where presence indicates a score of 1. This is the default - * output format for [[org.apache.mahout.drivers.DRMWriteBooleanSchema]] - */ -class DRMReadBooleanSchema extends Schema( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "elementDelim" -> " ", - "omitScore" -> true) - -/** Default Schema for typical text delimited drm file write where the score of a element is omitted. - * The presence of a element means the score = 1, the absence means a score of 0. - * This tells the writer to output DRM lines of the form - * (rowID<tab>columnID1<space>columnID2...) - */ -class DRMWriteBooleanSchema extends Schema( - "rowKeyDelim" -> "\t", - "columnIdStrengthDelim" -> ":", - "elementDelim" -> " ", - "omitScore" -> true) - http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index 274ad98..ba8f7d1 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -17,6 +17,8 @@ package org.apache.mahout.drivers +import org.apache.mahout.math.indexeddataset.{Writer, Reader, Schema, IndexedDataset} +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.spark.SparkContext._ import org.apache.mahout.math.RandomAccessSparseVector import com.google.common.collect.{BiMap, HashBiMap} @@ -24,23 +26,23 @@ import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} import org.apache.mahout.sparkbindings._ import scala.collection.JavaConversions._ -/** Extends Reader trait to supply the [[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader function for reading text delimited files as described in the [[org.apache.mahout.drivers.Schema]] +/** Extends Reader trait to supply the [[IndexedDatasetSpark]] as the type read and a reader function for reading text delimited files as described in the [[Schema]] */ -trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ +trait TDIndexedDatasetReader extends Reader[IndexedDatasetSpark]{ /** Read in text delimited elements from all URIs in the comma delimited source String and return * the DRM of all elements updating the dictionaries for row and column dictionaries. If there is * no strength value in the element, assume it's presence means a strength of 1. * * @param mc context for the Spark job * @param readSchema describes the delimiters and positions of values in the text delimited file. - * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] + * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]] * @return */ protected def elementReader( mc: DistributedContext, readSchema: Schema, source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = { + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = { try { val delimiter = readSchema("delim").asInstanceOf[String] val rowIDColumn = readSchema("rowIDColumn").asInstanceOf[Int] @@ -105,7 +107,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns) - IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary) + new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) } catch { case cce: ClassCastException => { @@ -120,14 +122,14 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ * * @param mc context for the Spark job * @param readSchema describes the delimiters and positions of values in the text delimited file. - * @param source comma delimited URIs of text files to be read into the [[org.apache.mahout.drivers.IndexedDataset]] + * @param source comma delimited URIs of text files to be read into the [[IndexedDatasetSpark]] * @return */ protected def drmReader( mc: DistributedContext, readSchema: Schema, source: String, - existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDataset = { + existingRowIDs: BiMap[String, Int] = HashBiMap.create()): IndexedDatasetSpark = { try { val rowKeyDelim = readSchema("rowKeyDelim").asInstanceOf[String] val columnIdStrengthDelim = readSchema("columnIdStrengthDelim").asInstanceOf[String] @@ -193,7 +195,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ // wrap the DrmRdd and a CheckpointedDrm, which can be used anywhere a DrmLike[Int] is needed val drmInteractions = drmWrap[Int](indexedInteractions, numRows, numColumns) - IndexedDataset(drmInteractions, rowIDDictionary, columnIDDictionary) + new IndexedDatasetSpark(drmInteractions, rowIDDictionary, columnIDDictionary) } catch { case cce: ClassCastException => { @@ -203,7 +205,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ } } - // this creates a BiMap from an ID collection. The ID points to an ordinal int + // this creates a BiMap from an ID collection. The ID points to an ordinal int // which is used internal to Mahout as the row or column ID // todo: this is a non-distributed process in an otherwise distributed reader and the BiMap is a // non-rdd based object--this will limit the size of the dataset to ones where the dictionaries fit @@ -218,7 +220,7 @@ trait TDIndexedDatasetReader extends Reader[IndexedDataset]{ } } -trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ +trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} @@ -226,13 +228,13 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDataset]{ * * @param mc context for the Spark job * @param writeSchema describes the delimiters and positions of values in the output text delimited file. - * @param dest directory to write text delimited version of [[org.apache.mahout.drivers.IndexedDataset]] + * @param dest directory to write text delimited version of [[IndexedDatasetSpark]] */ protected def writer( mc: DistributedContext, writeSchema: Schema, dest: String, - indexedDataset: IndexedDataset, + indexedDataset: IndexedDatasetSpark, sort: Boolean = true): Unit = { try { val rowKeyDelim = writeSchema("rowKeyDelim").asInstanceOf[String] @@ -295,7 +297,7 @@ trait TDIndexedDatasetReaderWriter extends TDIndexedDatasetReader with TDIndexed /** Reads text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param readSchema describes the delimiters and position of values in the text delimited file to be read. * @param mc Spark context for reading files - * @note The source is supplied by Reader#readElementsFrom . + * @note The source is supplied to Reader#readElementsFrom . * */ class TextDelimitedIndexedDatasetReader(val readSchema: Schema) (implicit val mc: DistributedContext) extends TDIndexedDatasetReader @@ -303,7 +305,7 @@ class TextDelimitedIndexedDatasetReader(val readSchema: Schema) /** Writes text delimited files into an IndexedDataset. Classes are needed to supply trait params in their constructor. * @param writeSchema describes the delimiters and position of values in the text delimited file(s) written. * @param mc Spark context for reading files - * @note the destination is supplied by Writer#writeTo trait method + * @note the destination is supplied to Writer#writeTo * */ class TextDelimitedIndexedDatasetWriter(val writeSchema: Schema, val sort: Boolean = true)(implicit val mc: DistributedContext) extends TDIndexedDatasetWriter @@ -316,41 +318,3 @@ class TextDelimitedIndexedDatasetReaderWriter(val readSchema: Schema, val writeS (implicit val mc: DistributedContext) extends TDIndexedDatasetReaderWriter -/** A version of IndexedDataset that has it's own writeTo method from a Writer trait. This is an alternative to creating - * a Writer based stand-alone class for writing. Consider it experimental allowing similar semantics to drm.writeDrm(). - * Experimental because it's not clear that it is simpler or more intuitive and since IndexedDatasetTextDelimitedWriteables - * are probably short lived in terms of lines of code so complexity may be moot. - * @param matrix the data - * @param rowIDs bi-directional dictionary for rows of external IDs to internal ordinal Mahout IDs. - * @param columnIDs bi-directional dictionary for columns of external IDs to internal ordinal Mahout IDs. - * @param writeSchema contains params for the schema/format or the written text delimited file. - * @param mc mahout distributed context (DistributedContext) may be implicitly defined. - * */ -class IndexedDatasetTextDelimitedWriteable( - matrix: CheckpointedDrm[Int], - rowIDs: BiMap[String,Int], - columnIDs: BiMap[String,Int], - val writeSchema: Schema, - val sort: Boolean = true) - (implicit val mc: DistributedContext) - extends IndexedDataset(matrix, rowIDs, columnIDs) with TDIndexedDatasetWriter { - - override def writeTo(collection: IndexedDataset = this, dest: String): Unit = { - super.writeTo(this, dest) - } -} - -/** - * Companion object for the case class [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] primarily - * used to get a secondary constructor for - * making one [[org.apache.mahout.drivers.IndexedDatasetTextDelimitedWriteable]] from another. Used when you have a - * factory like [[org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader]] - * {{{ - * val id = IndexedDatasetTextDelimitedWriteable(indexedDatasetReader.readElementsFrom(source)) - * }}} - */ - -object IndexedDatasetTextDelimitedWriteable { - /** Secondary constructor for [[org.apache.mahout.drivers.IndexedDataset]] */ - def apply(id2: IndexedDatasetTextDelimitedWriteable, sort: Boolean = true) = new IndexedDatasetTextDelimitedWriteable(id2.matrix, id2.rowIDs, id2.columnIDs, id2.writeSchema, id2.sort)(id2.mc) -} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 08b2c34..c0d36c6 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -17,10 +17,11 @@ package org.apache.mahout.sparkbindings -import java.io.IOException - +import com.google.common.collect.{BiMap, HashBiMap} +import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader import org.apache.mahout.math._ -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema} +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm.logical._ @@ -250,6 +251,37 @@ object SparkEngine extends DistributedEngine { } } + /** + * reads an IndexedDatasetSpark from default text delimited files + * @param src a comma separated list of URIs to read from + * @param schema how the text file is formatted + * @return + */ + def indexedDatasetDFSRead(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDatasetSpark = { + val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) + val ids = reader.readDRMFrom(src, existingRowIDs) + ids + } + + /** + * reads an IndexedDatasetSpark from default text delimited files + * @param src a comma separated list of URIs to read from + * @param schema how the text file is formatted + * @return + */ + def indexedDatasetDFSReadElements(src: String, + schema: Schema = DefaultIndexedDatasetElementReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): + IndexedDatasetSpark = { + val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) + val ids = reader.readElementsFrom(src, existingRowIDs) + ids + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index b753f6f..e5a2b2a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -197,4 +197,16 @@ class CheckpointedDrmSpark[K: ClassTag]( protected def computeNNonZero = cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong + /** Changes the number of rows in the DRM without actually touching the underlying data. Used to + * redimension a DRM after it has been created, which implies some blank, non-existent rows. + * @param n new row dimension + * @return + */ + override def newRowCardinality(n: Int): CheckpointedDrm[K] = { + assert(n > -1) + assert( n >= nrow) + val newCheckpointedDrm = drmWrap[K](rdd, n, ncol) + newCheckpointedDrm + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala new file mode 100644 index 0000000..d3aa0a8 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.sparkbindings.indexeddataset + +import com.google.common.collect.BiMap +import org.apache.mahout.drivers.TextDelimitedIndexedDatasetWriter +import org.apache.mahout.math.drm.{DistributedContext, CheckpointedDrm} +import org.apache.mahout.math.indexeddataset +import org.apache.mahout.math.indexeddataset.{DefaultIndexedDatasetWriteSchema, Reader, Schema, IndexedDataset} + +/** + * Spark implementation of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] providing the Spark specific + * dfsWrite method + */ +class IndexedDatasetSpark(val matrix: CheckpointedDrm[Int], val rowIDs: BiMap[String,Int], val columnIDs: BiMap[String,Int]) + extends IndexedDataset { + + /** Secondary constructor enabling immutability */ + def this(id2: IndexedDatasetSpark){ + this(id2.matrix, id2.rowIDs, id2.columnIDs) + } + + /** Factory method used to create this extending class when the interface of + * [[org.apache.mahout.math.indexeddataset.IndexedDataset]] is all that is known. */ + override def create(matrix: CheckpointedDrm[Int], rowIDs: BiMap[String,Int], columnIDs: BiMap[String,Int]): + IndexedDatasetSpark = { + new IndexedDatasetSpark(matrix, rowIDs, columnIDs) + } + + /** implements the core method [[indexeddataset.IndexedDataset#dfsWrite]]*/ + override def dfsWrite(dest: String, schema: Schema = DefaultIndexedDatasetWriteSchema) + (implicit sc: DistributedContext): + Unit = { + val writer = new TextDelimitedIndexedDatasetWriter(schema)(sc) + writer.writeTo(this, dest) + } +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 6ca2a14..c441716 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -17,6 +17,9 @@ package org.apache.mahout +import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader +import org.apache.mahout.math.indexeddataset.Schema +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.spark.{SparkConf, SparkContext} import java.io._ import scala.collection.mutable.ArrayBuffer @@ -224,5 +227,4 @@ package object sparkbindings { mcjars } - } http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala deleted file mode 100644 index 29c7b84..0000000 --- a/spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.cf - -import org.apache.mahout.math.cf.SimilarityAnalysis -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings.{MatrixOps, _} -import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -import org.apache.mahout.test.MahoutSuite -import org.scalatest.FunSuite - -/* values -A = -1 1 0 0 0 -0 0 1 1 0 -0 0 0 0 1 -1 0 0 1 0 - -B = -1 1 1 1 0 -1 1 1 1 0 -0 0 1 0 1 -1 1 0 1 0 - */ - -class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite { - - // correct cooccurrence with LLR - final val matrixLLRCoocAtAControl = dense( - (0.0, 1.7260924347106847, 0.0, 0.0, 0.0), - (1.7260924347106847, 0.0, 0.0, 0.0, 0.0), - (0.0, 0.0, 0.0, 1.7260924347106847, 0.0), - (0.0, 0.0, 1.7260924347106847, 0.0, 0.0), - (0.0, 0.0, 0.0, 0.0, 0.0)) - - // correct cross-cooccurrence with LLR - final val m = dense( - (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), - (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), - (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897), - (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), - (0.0, 0.0, 0.0, 0.0, 4.498681156950466)) - - final val matrixLLRCoocBtAControl = dense( - (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), - (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), - (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), - (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), - (0.0, 0.0, 0.6795961471815897, 0.0, 4.498681156950466)) - - - test("cooccurrence [A'A], [B'A] boolbean data using LLR") { - val a = dense( - (1, 1, 0, 0, 0), - (0, 0, 1, 1, 0), - (0, 0, 0, 0, 1), - (1, 0, 0, 1, 0)) - - val b = dense( - (1, 1, 1, 1, 0), - (1, 1, 1, 1, 0), - (0, 0, 1, 0, 1), - (1, 1, 0, 1, 0)) - - val drmA = drmParallelize(m = a, numPartitions = 2) - val drmB = drmParallelize(m = b, numPartitions = 2) - - //self similarity - val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB)) - val matrixSelfCooc = drmCooc(0).checkpoint().collect - val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) - var n = (new MatrixOps(m = diffMatrix)).norm - n should be < 1E-10 - - //cross similarity - val matrixCrossCooc = drmCooc(1).checkpoint().collect - val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) - n = (new MatrixOps(m = diff2Matrix)).norm - n should be < 1E-10 - - } - - test("cooccurrence [A'A], [B'A] double data using LLR") { - val a = dense( - (100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), - ( 0.0D, 0.0D, 10.0D, 1.0D, 0.0D), - ( 0.0D, 0.0D, 0.0D, 0.0D, 1000.0D), - ( 1.0D, 0.0D, 0.0D, 10.0D, 0.0D)) - - val b = dense( - (10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), - ( 10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D), - ( 0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), - ( 100.0D, 1.0D, 0.0D, 100000.0D, 0.0D)) - - val drmA = drmParallelize(m = a, numPartitions = 2) - val drmB = drmParallelize(m = b, numPartitions = 2) - - //self similarity - val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) - val matrixSelfCooc = drmCooc(0).checkpoint().collect - val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) - var n = (new MatrixOps(m = diffMatrix)).norm - n should be < 1E-10 - - //cross similarity - val matrixCrossCooc = drmCooc(1).checkpoint().collect - val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) - n = (new MatrixOps(m = diff2Matrix)).norm - n should be < 1E-10 - } - - test("cooccurrence [A'A], [B'A] integer data using LLR") { - val a = dense( - ( 1000, 10, 0, 0, 0), - ( 0, 0, -10000, 10, 0), - ( 0, 0, 0, 0, 100), - (10000, 0, 0, 1000, 0)) - - val b = dense( - ( 100, 1000, -10000, 10000, 0), - (10000, 1000, 100, 10, 0), - ( 0, 0, 10, 0, -100), - ( 10, 100, 0, 1000, 0)) - - val drmA = drmParallelize(m = a, numPartitions = 2) - val drmB = drmParallelize(m = b, numPartitions = 2) - - //self similarity - val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) - //var cp = drmSelfCooc(0).checkpoint() - //cp.writeDRM("/tmp/cooc-spark/")//to get values written - val matrixSelfCooc = drmCooc(0).checkpoint().collect - val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) - var n = (new MatrixOps(m = diffMatrix)).norm - n should be < 1E-10 - - //cross similarity - val matrixCrossCooc = drmCooc(1).checkpoint().collect - val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) - n = (new MatrixOps(m = diff2Matrix)).norm - n should be < 1E-10 - } - - test("cooccurrence two matrices with different number of columns"){ - val a = dense( - (1, 1, 0, 0, 0), - (0, 0, 1, 1, 0), - (0, 0, 0, 0, 1), - (1, 0, 0, 1, 0)) - - val b = dense( - (0, 1, 1, 0), - (1, 1, 1, 0), - (0, 0, 1, 0), - (1, 1, 0, 1)) - - val matrixLLRCoocBtANonSymmetric = dense( - (0.0, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847), - (0.0, 0.6795961471815897, 0.6795961471815897, 0.0), - (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0), - (5.545177444479561, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847), - (0.0, 0.0, 0.6795961471815897, 0.0)) - - val drmA = drmParallelize(m = a, numPartitions = 2) - val drmB = drmParallelize(m = b, numPartitions = 2) - - //self similarity - val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) - val matrixSelfCooc = drmCooc(0).checkpoint().collect - val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) - var n = (new MatrixOps(m = diffMatrix)).norm - n should be < 1E-10 - - //cross similarity - val matrixCrossCooc = drmCooc(1).checkpoint().collect - val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric) - n = (new MatrixOps(m = diff2Matrix)).norm - - //cooccurrence without LLR is just a A'B - //val inCoreAtB = a.transpose().times(b) - //val bp = 0 - } - - test("LLR calc") { - val A = dense( - (1, 1, 0, 0, 0), - (0, 0, 1, 1, 0), - (0, 0, 0, 0, 1), - (1, 0, 0, 1, 0)) - - val AtA = A.transpose().times(A) - - /* AtA is: - 0 => {0:2.0,1:1.0,3:1.0} - 1 => {0:1.0,1:1.0} - 2 => {2:1.0,3:1.0} - 3 => {0:1.0,2:1.0,3:2.0} - 4 => {4:1.0} - - val AtAd = dense( - (2, 1, 0, 1, 0), - (1, 1, 0, 0, 0), - (0, 0, 1, 1, 0), - (1, 0, 1, 2, 0), - (0, 0, 0, 0, 1)) - - val AtAdNoSelfCooc = dense( - (0, 1, 0, 1, 0), - (1, 0, 0, 0, 0), - (0, 0, 0, 1, 0), - (1, 0, 1, 0, 0), - (0, 0, 0, 0, 0)) - - */ - - //item (1,0) - val numInteractionsWithAandB = 1L - val numInteractionsWithA = 1L - val numInteractionsWithB = 2L - val numInteractions = 6l - - val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions) - - assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty - } - - test("downsampling by number per row") { - val a = dense( - (1, 1, 1, 1, 0), - (1, 1, 1, 1, 1), - (0, 0, 0, 0, 1), - (1, 1, 0, 1, 0)) - val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2) - - val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4) - //count non-zero values, should be == 7 - var numValues = 0 - val m = downSampledDrm.collect - val it = m.iterator() - while (it.hasNext) { - val v = it.next().vector() - val nonZeroIt = v.nonZeroes().iterator() - while (nonZeroIt.hasNext) { - numValues += 1 - nonZeroIt.next() - } - } - - assert(numValues == 8) //Don't change the random seed or this may fail. - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/666d314f/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala new file mode 100644 index 0000000..0b3b3eb --- /dev/null +++ b/spark/src/test/scala/org/apache/mahout/cf/SimilarityAnalysisSuite.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math.cf.SimilarityAnalysis +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.scalabindings.{MatrixOps, _} +import org.apache.mahout.sparkbindings.test.DistributedSparkSuite +import org.apache.mahout.test.MahoutSuite +import org.scalatest.FunSuite + +/* values +A = +1 1 0 0 0 +0 0 1 1 0 +0 0 0 0 1 +1 0 0 1 0 + +B = +1 1 1 1 0 +1 1 1 1 0 +0 0 1 0 1 +1 1 0 1 0 + */ + +// todo: add tests for the IndexedDataset coccurrence methods + +class SimilarityAnalysisSuite extends FunSuite with MahoutSuite with DistributedSparkSuite { + + // correct cooccurrence with LLR + final val matrixLLRCoocAtAControl = dense( + (0.0, 1.7260924347106847, 0.0, 0.0, 0.0), + (1.7260924347106847, 0.0, 0.0, 0.0, 0.0), + (0.0, 0.0, 0.0, 1.7260924347106847, 0.0), + (0.0, 0.0, 1.7260924347106847, 0.0, 0.0), + (0.0, 0.0, 0.0, 0.0, 0.0)) + + // correct cross-cooccurrence with LLR + final val m = dense( + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.6795961471815897), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 1.7260924347106847, 0.0), + (0.0, 0.0, 0.0, 0.0, 4.498681156950466)) + + final val matrixLLRCoocBtAControl = dense( + (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), + (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), + (0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.6795961471815897, 0.0), + (1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847, 0.0), + (0.0, 0.0, 0.6795961471815897, 0.0, 4.498681156950466)) + + + test("cooccurrence [A'A], [B'A] boolbean data using LLR") { + val a = dense( + (1, 1, 0, 0, 0), + (0, 0, 1, 1, 0), + (0, 0, 0, 0, 1), + (1, 0, 0, 1, 0)) + + val b = dense( + (1, 1, 1, 1, 0), + (1, 1, 1, 1, 0), + (0, 0, 1, 0, 1), + (1, 1, 0, 1, 0)) + + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + //self similarity + val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, randomSeed = 1, drmBs = Array(drmB)) + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) + n = (new MatrixOps(m = diff2Matrix)).norm + n should be < 1E-10 + + } + + test("cooccurrence [A'A], [B'A] double data using LLR") { + val a = dense( + (100000.0D, 1.0D, 0.0D, 0.0D, 0.0D), + ( 0.0D, 0.0D, 10.0D, 1.0D, 0.0D), + ( 0.0D, 0.0D, 0.0D, 0.0D, 1000.0D), + ( 1.0D, 0.0D, 0.0D, 10.0D, 0.0D)) + + val b = dense( + (10000.0D, 100.0D, 1000.0D, 1.0D, 0.0D), + ( 10.0D, 1.0D, 10000000.0D, 10.0D, 0.0D), + ( 0.0D, 0.0D, 1000.0D, 0.0D, 100.0D), + ( 100.0D, 1.0D, 0.0D, 100000.0D, 0.0D)) + + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + //self similarity + val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) + n = (new MatrixOps(m = diff2Matrix)).norm + n should be < 1E-10 + } + + test("cooccurrence [A'A], [B'A] integer data using LLR") { + val a = dense( + ( 1000, 10, 0, 0, 0), + ( 0, 0, -10000, 10, 0), + ( 0, 0, 0, 0, 100), + (10000, 0, 0, 1000, 0)) + + val b = dense( + ( 100, 1000, -10000, 10000, 0), + (10000, 1000, 100, 10, 0), + ( 0, 0, 10, 0, -100), + ( 10, 100, 0, 1000, 0)) + + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + //self similarity + val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + //var cp = drmSelfCooc(0).checkpoint() + //cp.writeDRM("/tmp/cooc-spark/")//to get values written + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtAControl) + n = (new MatrixOps(m = diff2Matrix)).norm + n should be < 1E-10 + } + + test("cooccurrence two matrices with different number of columns"){ + val a = dense( + (1, 1, 0, 0, 0), + (0, 0, 1, 1, 0), + (0, 0, 0, 0, 1), + (1, 0, 0, 1, 0)) + + val b = dense( + (0, 1, 1, 0), + (1, 1, 1, 0), + (0, 0, 1, 0), + (1, 1, 0, 1)) + + val matrixLLRCoocBtANonSymmetric = dense( + (0.0, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847), + (0.0, 0.6795961471815897, 0.6795961471815897, 0.0), + (1.7260924347106847, 0.6795961471815897, 0.6795961471815897, 0.0), + (5.545177444479561, 1.7260924347106847, 1.7260924347106847, 1.7260924347106847), + (0.0, 0.0, 0.6795961471815897, 0.0)) + + val drmA = drmParallelize(m = a, numPartitions = 2) + val drmB = drmParallelize(m = b, numPartitions = 2) + + //self similarity + val drmCooc = SimilarityAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) + val matrixSelfCooc = drmCooc(0).checkpoint().collect + val diffMatrix = matrixSelfCooc.minus(matrixLLRCoocAtAControl) + var n = (new MatrixOps(m = diffMatrix)).norm + n should be < 1E-10 + + //cross similarity + val matrixCrossCooc = drmCooc(1).checkpoint().collect + val diff2Matrix = matrixCrossCooc.minus(matrixLLRCoocBtANonSymmetric) + n = (new MatrixOps(m = diff2Matrix)).norm + + //cooccurrence without LLR is just a A'B + //val inCoreAtB = a.transpose().times(b) + //val bp = 0 + } + + test("LLR calc") { + val A = dense( + (1, 1, 0, 0, 0), + (0, 0, 1, 1, 0), + (0, 0, 0, 0, 1), + (1, 0, 0, 1, 0)) + + val AtA = A.transpose().times(A) + + /* AtA is: + 0 => {0:2.0,1:1.0,3:1.0} + 1 => {0:1.0,1:1.0} + 2 => {2:1.0,3:1.0} + 3 => {0:1.0,2:1.0,3:2.0} + 4 => {4:1.0} + + val AtAd = dense( + (2, 1, 0, 1, 0), + (1, 1, 0, 0, 0), + (0, 0, 1, 1, 0), + (1, 0, 1, 2, 0), + (0, 0, 0, 0, 1)) + + val AtAdNoSelfCooc = dense( + (0, 1, 0, 1, 0), + (1, 0, 0, 0, 0), + (0, 0, 0, 1, 0), + (1, 0, 1, 0, 0), + (0, 0, 0, 0, 0)) + + */ + + //item (1,0) + val numInteractionsWithAandB = 1L + val numInteractionsWithA = 1L + val numInteractionsWithB = 2L + val numInteractions = 6l + + val llr = SimilarityAnalysis.logLikelihoodRatio(numInteractionsWithA, numInteractionsWithB, numInteractionsWithAandB, numInteractions) + + assert(llr == 2.6341457841558764) // value calculated by hadoop itemsimilairty + } + + test("downsampling by number per row") { + val a = dense( + (1, 1, 1, 1, 0), + (1, 1, 1, 1, 1), + (0, 0, 0, 0, 1), + (1, 1, 0, 1, 0)) + val drmA: DrmLike[Int] = drmParallelize(m = a, numPartitions = 2) + + val downSampledDrm = SimilarityAnalysis.sampleDownAndBinarize(drmA, 0xdeadbeef, 4) + //count non-zero values, should be == 7 + var numValues = 0 + val m = downSampledDrm.collect + val it = m.iterator() + while (it.hasNext) { + val v = it.next().vector() + val nonZeroIt = v.nonZeroes().iterator() + while (nonZeroIt.hasNext) { + numValues += 1 + nonZeroIt.next() + } + } + + assert(numValues == 8) //Don't change the random seed or this may fail. + } +}
