[ 
https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14159319#comment-14159319
 ] 

ASF GitHub Bot commented on MAHOUT-1615:
----------------------------------------

Github user dlyubimov commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/52#discussion_r18432276
  
    --- Diff: spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala 
---
    @@ -0,0 +1,56 @@
    +package org.apache.mahout.common
    +
    +import scala.reflect.ClassTag
    +import org.apache.hadoop.io._
    +import java.util.Arrays
    +
    +class DrmMetadata(
    +
    +    /** Writable  key type as a sub-type of Writable */
    +    val keyTypeWritable: Class[_],
    +
    +    /** Value writable type, as a sub-type of Writable */
    +    val valueTypeWritable: Class[_]
    +
    +    ) {
    +
    +  import DrmMetadata._
    +
    +  val (
    +
    +      /** Actual drm key class tag once converted out of writable */
    +      keyClassTag: ClassTag[_],
    +
    +      /** Conversion from Writable to value type of the DRM key */
    +      keyW2ValFunc: ((Writable) => Any)
    +
    +      ) = keyTypeWritable match {
    +    case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
    +    case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
    +    case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> 
w2double _
    +    case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float 
_
    +    case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> 
w2string _
    +    case cz if (cz == classOf[BooleanWritable]) => 
ClassTag(classOf[Boolean]) -> w2bool _
    +    case cz if (cz == classOf[BytesWritable]) => 
ClassTag(classOf[Array[Byte]]) -> w2bytes _
    +    case _ => throw new IllegalArgumentException(s"Unsupported DRM key 
type:${keyTypeWritable.getName}")
    +  }
    +
    +}
    +
    +object DrmMetadata {
    +
    +  private[common] def w2int(w: Writable) = 
w.asInstanceOf[IntWritable].get()
    +
    +  private[common] def w2long(w: Writable) = 
w.asInstanceOf[LongWritable].get()
    +
    +  private[common] def w2double(w: Writable) = 
w.asInstanceOf[DoubleWritable].get()
    +
    +  private[common] def w2float(w: Writable) = 
w.asInstanceOf[FloatWritable].get()
    +
    +  private[common] def w2string(w: Writable) = 
w.asInstanceOf[Text].toString()
    +
    +  private[common] def w2bool(w: Writable) = 
w.asInstanceOf[BooleanWritable].get()
    +
    +  private[common] def w2bytes(w: Writable) = 
Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
    +                                                           
w.asInstanceOf[BytesWritable].getLength())
    +}
    --- End diff --
    
    yes this looks right now


> SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for 
> Text-Keyed SequenceFiles
> -------------------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1615
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1615
>             Project: Mahout
>          Issue Type: Bug
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>             Fix For: 1.0
>
>
> When reading in seq2sparse output from HDFS in the spark-shell of form 
> <Text,VectorWriteable>  SparkEngine's drmFromHDFS method is creating rdds 
> with the same Key for all Pairs:  
> {code}
> mahout> val drmTFIDF= drmFromHDFS( path = 
> "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000")
> {code}
> Has keys:
> {...} 
>     key: /talk.religion.misc/84570
>     key: /talk.religion.misc/84570
>     key: /talk.religion.misc/84570
> {...}
> for the entire set.  This is the last Key in the set.
> The problem can be traced to the first line of drmFromHDFS(...) in 
> SparkEngine.scala: 
> {code}
>  val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], 
> minPartitions = parMin)
>         // Get rid of VectorWritable
>         .map(t => (t._1, t._2.get()))
> {code}
> which gives the same key for all t._1.
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to