[
https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14159319#comment-14159319
]
ASF GitHub Bot commented on MAHOUT-1615:
----------------------------------------
Github user dlyubimov commented on a diff in the pull request:
https://github.com/apache/mahout/pull/52#discussion_r18432276
--- Diff: spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
---
@@ -0,0 +1,56 @@
+package org.apache.mahout.common
+
+import scala.reflect.ClassTag
+import org.apache.hadoop.io._
+import java.util.Arrays
+
+class DrmMetadata(
+
+ /** Writable key type as a sub-type of Writable */
+ val keyTypeWritable: Class[_],
+
+ /** Value writable type, as a sub-type of Writable */
+ val valueTypeWritable: Class[_]
+
+ ) {
+
+ import DrmMetadata._
+
+ val (
+
+ /** Actual drm key class tag once converted out of writable */
+ keyClassTag: ClassTag[_],
+
+ /** Conversion from Writable to value type of the DRM key */
+ keyW2ValFunc: ((Writable) => Any)
+
+ ) = keyTypeWritable match {
+ case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
+ case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
+ case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double ->
w2double _
+ case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float
_
+ case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) ->
w2string _
+ case cz if (cz == classOf[BooleanWritable]) =>
ClassTag(classOf[Boolean]) -> w2bool _
+ case cz if (cz == classOf[BytesWritable]) =>
ClassTag(classOf[Array[Byte]]) -> w2bytes _
+ case _ => throw new IllegalArgumentException(s"Unsupported DRM key
type:${keyTypeWritable.getName}")
+ }
+
+}
+
+object DrmMetadata {
+
+ private[common] def w2int(w: Writable) =
w.asInstanceOf[IntWritable].get()
+
+ private[common] def w2long(w: Writable) =
w.asInstanceOf[LongWritable].get()
+
+ private[common] def w2double(w: Writable) =
w.asInstanceOf[DoubleWritable].get()
+
+ private[common] def w2float(w: Writable) =
w.asInstanceOf[FloatWritable].get()
+
+ private[common] def w2string(w: Writable) =
w.asInstanceOf[Text].toString()
+
+ private[common] def w2bool(w: Writable) =
w.asInstanceOf[BooleanWritable].get()
+
+ private[common] def w2bytes(w: Writable) =
Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
+
w.asInstanceOf[BytesWritable].getLength())
+}
--- End diff --
yes this looks right now
> SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for
> Text-Keyed SequenceFiles
> -------------------------------------------------------------------------------------------------
>
> Key: MAHOUT-1615
> URL: https://issues.apache.org/jira/browse/MAHOUT-1615
> Project: Mahout
> Issue Type: Bug
> Reporter: Andrew Palumbo
> Assignee: Andrew Palumbo
> Fix For: 1.0
>
>
> When reading in seq2sparse output from HDFS in the spark-shell of form
> <Text,VectorWriteable> SparkEngine's drmFromHDFS method is creating rdds
> with the same Key for all Pairs:
> {code}
> mahout> val drmTFIDF= drmFromHDFS( path =
> "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000")
> {code}
> Has keys:
> {...}
> key: /talk.religion.misc/84570
> key: /talk.religion.misc/84570
> key: /talk.religion.misc/84570
> {...}
> for the entire set. This is the last Key in the set.
> The problem can be traced to the first line of drmFromHDFS(...) in
> SparkEngine.scala:
> {code}
> val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable],
> minPartitions = parMin)
> // Get rid of VectorWritable
> .map(t => (t._1, t._2.get()))
> {code}
> which gives the same key for all t._1.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)