[
https://issues.apache.org/jira/browse/MAHOUT-1541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14037711#comment-14037711
]
ASF GitHub Bot commented on MAHOUT-1541:
----------------------------------------
Github user dlyubimov commented on a diff in the pull request:
https://github.com/apache/mahout/pull/22#discussion_r13987730
--- Diff: spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala
---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.drivers
+
+import org.apache.spark.SparkContext._
+import org.apache.mahout.math.RandomAccessSparseVector
+import org.apache.spark.SparkContext
+import com.google.common.collect.{BiMap, HashBiMap}
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.drm.{CheckpointedDrm, DrmLike}
+import org.apache.mahout.sparkbindings._
+
+
+/** Reader trait is abstract in the sense that the reader function must be
defined by an extending trait, which also defines the type to be read.
+ * @tparam T type of object read, usually supplied by an extending trait.
+ * @todo the reader need not create both dictionaries but does at
present. There are cases where one or the other dictionary is never used so
saving the memory for a very large dictionary may be worth the optimization to
specify which dictionaries are created.
+ */
+trait Reader[T]{
+ val mc: SparkContext
+ val readSchema: Schema
+ protected def reader(mc: SparkContext, readSchema: Schema, source:
String): T
+ def readFrom(source: String): T = reader(mc, readSchema, source)
+}
+
+/** Writer trait is abstract in the sense that the writer method must be
supplied by an extending trait, which also defines the type to be written.
+ * @tparam T
+ */
+trait Writer[T]{
+ val mc: SparkContext
+ val writeSchema: Schema
+ protected def writer(mc: SparkContext, writeSchema: Schema, dest:
String, collection: T): Unit
+ def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest,
collection)
+}
+
+/** Extends Reader trait to supply the
[[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader
function for reading text delimited files as described in the
[[org.apache.mahout.drivers.Schema]]
+ */
+trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
+ /** Read in text delimited tuples from all URIs in this comma delimited
source String.
+ *
+ * @param mc context for the Spark job
+ * @param readSchema describes the delimiters and positions of values
in the text delimited file.
+ * @param source comma delimited URIs of text files to be read into the
[[org.apache.mahout.drivers.IndexedDataset]]
+ * @return
+ */
+ protected def reader(mc: SparkContext, readSchema: Schema, source:
String): IndexedDataset = {
+ try {
+ val delimiter = readSchema("delim").asInstanceOf[String]
+ val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
+ val columnIDPosition =
readSchema("columnIDPosition").asInstanceOf[Int]
+ val filterPosition = readSchema("filterPosition").asInstanceOf[Int]
+ val filterBy = readSchema("filter").asInstanceOf[String]
+ //instance vars must be put into locally scoped vals when used in
closures that are
+ //executed but Spark
+
+ assert(!source.isEmpty, {
+ println(this.getClass.toString + ": has no files to read")
+ throw new IllegalArgumentException
+ })
+
+ var columns = mc.textFile(source).map({ line =>
line.split(delimiter)})
+
+ columns = columns.filter({ tokens => tokens(filterPosition) ==
filterBy})
+
+ val interactions = columns.map({ tokens => tokens(rowIDPosition) ->
tokens(columnIDPosition)})
+
+ interactions.cache()
+
+ val rowIDs = interactions.map({ case (rowID, _) =>
rowID}).distinct().collect()
+ val columnIDs = interactions.map({ case (_, columnID) =>
columnID}).distinct().collect()
+
+ val numRows = rowIDs.size
+ val numColumns = columnIDs.size
+
+ val rowIDDictionary = asOrderedDictionary(rowIDs)
+ val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
+
+ val columnIDDictionary = asOrderedDictionary(columnIDs)
+ val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
+
+ val indexedInteractions =
+ interactions.map({ case (rowID, columnID) =>
+ val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
+ val columnIndex =
columnIDDictionary_bcast.value.get(columnID).get
+
+ rowIndex -> columnIndex
+ }).groupByKey().map({ case (rowIndex, columnIndexes) =>
--- End diff --
Here and elsewhere. If compound closure, parenthesis could be ommitted
interactions.map {
case (rowID, columnID) =>
val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
...
> Create CLI Driver for Spark Cooccurrence Analysis
> -------------------------------------------------
>
> Key: MAHOUT-1541
> URL: https://issues.apache.org/jira/browse/MAHOUT-1541
> Project: Mahout
> Issue Type: Bug
> Components: CLI
> Reporter: Pat Ferrel
> Assignee: Pat Ferrel
>
> Create a CLI driver to import data in a flexible manner, create an
> IndexedDataset with BiMap ID translation dictionaries, call the Spark
> CooccurrenceAnalysis with the appropriate params, then write output with
> external IDs optionally reattached.
> Ultimately it should be able to read input as the legacy mr does but will
> support reading externally defined IDs and flexible formats. Output will be
> of the legacy format or text files of the user's specification with
> reattached Item IDs.
> Support for legacy formats is a question, users can always use the legacy
> code if they want this. Internal to the IndexedDataset is a Spark DRM so
> pipelining can be accomplished without any writing to an actual file so the
> legacy sequence file output may not be needed.
> Opinions?
--
This message was sent by Atlassian JIRA
(v6.2#6252)