[ 
https://issues.apache.org/jira/browse/MAHOUT-1541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14037711#comment-14037711
 ] 

ASF GitHub Bot commented on MAHOUT-1541:
----------------------------------------

Github user dlyubimov commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/22#discussion_r13987730
  
    --- Diff: spark/src/main/scala/org/apache/mahout/drivers/ReaderWriter.scala 
---
    @@ -0,0 +1,222 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.mahout.drivers
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.mahout.math.RandomAccessSparseVector
    +import org.apache.spark.SparkContext
    +import com.google.common.collect.{BiMap, HashBiMap}
    +import scala.collection.JavaConversions._
    +import org.apache.mahout.math.drm.{CheckpointedDrm, DrmLike}
    +import org.apache.mahout.sparkbindings._
    +
    +
    +/** Reader trait is abstract in the sense that the reader function must be 
defined by an extending trait, which also defines the type to be read.
    +  * @tparam T type of object read, usually supplied by an extending trait.
    +  * @todo the reader need not create both dictionaries but does at 
present. There are cases where one or the other dictionary is never used so 
saving the memory for a very large dictionary may be worth the optimization to 
specify which dictionaries are created.
    +  */
    +trait Reader[T]{
    +  val mc: SparkContext
    +  val readSchema: Schema
    +  protected def reader(mc: SparkContext, readSchema: Schema, source: 
String): T
    +  def readFrom(source: String): T = reader(mc, readSchema, source)
    +}
    +
    +/** Writer trait is abstract in the sense that the writer method must be 
supplied by an extending trait, which also defines the type to be written.
    +  * @tparam T
    +  */
    +trait Writer[T]{
    +  val mc: SparkContext
    +  val writeSchema: Schema
    +  protected def writer(mc: SparkContext, writeSchema: Schema, dest: 
String, collection: T): Unit
    +  def writeTo(collection: T, dest: String) = writer(mc, writeSchema, dest, 
collection)
    +}
    +
    +/** Extends Reader trait to supply the 
[[org.apache.mahout.drivers.IndexedDataset]] as the type read and a reader 
function for reading text delimited files as described in the 
[[org.apache.mahout.drivers.Schema]]
    +  */
    +trait TDIndexedDatasetReader extends Reader[IndexedDataset]{
    +  /** Read in text delimited tuples from all URIs in this comma delimited 
source String. 
    +    * 
    +    * @param mc context for the Spark job
    +    * @param readSchema describes the delimiters and positions of values 
in the text delimited file.
    +    * @param source comma delimited URIs of text files to be read into the 
[[org.apache.mahout.drivers.IndexedDataset]]
    +    * @return
    +    */
    +  protected def reader(mc: SparkContext, readSchema: Schema, source: 
String): IndexedDataset = {
    +    try {
    +      val delimiter = readSchema("delim").asInstanceOf[String]
    +      val rowIDPosition = readSchema("rowIDPosition").asInstanceOf[Int]
    +      val columnIDPosition = 
readSchema("columnIDPosition").asInstanceOf[Int]
    +      val filterPosition = readSchema("filterPosition").asInstanceOf[Int]
    +      val filterBy = readSchema("filter").asInstanceOf[String]
    +      //instance vars must be put into locally scoped vals when used in 
closures that are
    +      //executed but Spark
    +
    +      assert(!source.isEmpty, {
    +        println(this.getClass.toString + ": has no files to read")
    +        throw new IllegalArgumentException
    +      })
    +
    +      var columns = mc.textFile(source).map({ line => 
line.split(delimiter)})
    +
    +      columns = columns.filter({ tokens => tokens(filterPosition) == 
filterBy})
    +
    +      val interactions = columns.map({ tokens => tokens(rowIDPosition) -> 
tokens(columnIDPosition)})
    +
    +      interactions.cache()
    +
    +      val rowIDs = interactions.map({ case (rowID, _) => 
rowID}).distinct().collect()
    +      val columnIDs = interactions.map({ case (_, columnID) => 
columnID}).distinct().collect()
    +
    +      val numRows = rowIDs.size
    +      val numColumns = columnIDs.size
    +
    +      val rowIDDictionary = asOrderedDictionary(rowIDs)
    +      val rowIDDictionary_bcast = mc.broadcast(rowIDDictionary)
    +
    +      val columnIDDictionary = asOrderedDictionary(columnIDs)
    +      val columnIDDictionary_bcast = mc.broadcast(columnIDDictionary)
    +
    +      val indexedInteractions =
    +        interactions.map({ case (rowID, columnID) =>
    +          val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
    +          val columnIndex = 
columnIDDictionary_bcast.value.get(columnID).get
    +
    +          rowIndex -> columnIndex
    +        }).groupByKey().map({ case (rowIndex, columnIndexes) =>
    --- End diff --
    
    Here and elsewhere. If compound closure, parenthesis could be ommitted 
    
    
         interactions.map { 
              case (rowID, columnID) =>
                 val rowIndex = rowIDDictionary_bcast.value.get(rowID).get
    ...



> Create CLI Driver for Spark Cooccurrence Analysis
> -------------------------------------------------
>
>                 Key: MAHOUT-1541
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1541
>             Project: Mahout
>          Issue Type: Bug
>          Components: CLI
>            Reporter: Pat Ferrel
>            Assignee: Pat Ferrel
>
> Create a CLI driver to import data in a flexible manner, create an 
> IndexedDataset with BiMap ID translation dictionaries, call the Spark 
> CooccurrenceAnalysis with the appropriate params, then write output with 
> external IDs optionally reattached.
> Ultimately it should be able to read input as the legacy mr does but will 
> support reading externally defined IDs and flexible formats. Output will be 
> of the legacy format or text files of the user's specification with 
> reattached Item IDs. 
> Support for legacy formats is a question, users can always use the legacy 
> code if they want this. Internal to the IndexedDataset is a Spark DRM so 
> pipelining can be accomplished without any writing to an actual file so the 
> legacy sequence file output may not be needed.
> Opinions?



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to