[ https://issues.apache.org/jira/browse/MAHOUT-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147387#comment-14147387 ]
ASF GitHub Bot commented on MAHOUT-1615: ---------------------------------------- Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/52#discussion_r18015155 --- Diff: spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala --- @@ -127,33 +131,41 @@ object SparkEngine extends DistributedEngine { */ def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) - // Get rid of VectorWritable - .map(t => (t._1, t._2.get())) + // HDFS Paramaters + val hConf= new Configuration() + val hPath= new Path(path) + val fs= FileSystem.get(hConf) - def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = implicitly[ClassTag[K]] + /** Get the Key Class For the Sequence File */ + def getKeyClassTag[K:ClassTag] = ClassTag(new SequenceFile.Reader(fs, hPath, hConf).getKeyClass) + /** Get the Value Class For the Sequence File */ +// def getValueClassTag[V:ClassTag] = ClassTag(new SequenceFile.Reader(fs, hPath, hConf).getValueClass) - // Spark should've loaded the type info from the header, right? - val keyTag = getKeyClassTag(rdd) + // Spark doesn't check the Sequence File Header so we have to. + val keyTag = getKeyClassTag +// val ct= ClassTag(keyTag.getClass) + + // ClassTag to match on not lost by erasure + val ct= ClassTag(classOf[Writable]) val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match { - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) => ( + case ct if (keyTag == implicitly[ClassTag[IntWritable]]) => ( (v: AnyRef) => v.asInstanceOf[IntWritable].get, - (x: Any) => new IntWritable(x.asInstanceOf[Int]), + (x: Any) => new Integer(x.asInstanceOf[IntWritable].get), implicitly[ClassTag[Int]]) - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => ( + case ct if (keyTag == implicitly[ClassTag[Text]]) => ( (v: AnyRef) => v.asInstanceOf[Text].toString, (x: Any) => new Text(x.toString), implicitly[ClassTag[String]]) - case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[LongWritable]]) => ( + case ct if (keyTag == implicitly[ClassTag[LongWritable]]) => ( (v: AnyRef) => v.asInstanceOf[LongWritable].get, - (x: Any) => new LongWritable(x.asInstanceOf[Int]), + (x: Any) => new LongWritable(x.asInstanceOf[LongWritable].get), implicitly[ClassTag[Long]]) - case xx: ClassTag[Writable] => ( + case ct => ( (v: AnyRef) => v, --- End diff -- .. and since we know that Writables are not be useable since they are reused, we probably should block this case completely out with an error? There's another piece of information to consider. Spark itself definex implicit conversions from some well-known Writables to their payload types. Perhaps we should support everything that's there; and maybe even figure a way to automatically apply everything that Spark exports, without even doing cases. I tried to figure how to do that (i remember that) but still haven't figured. it may not be possible; at least, i remember i haven't figured how that might be done. But we are at least 1.5 years past that moment, so perhaps we could revisist this from a fresh perspective. It would require eyeballing Spark's implicit Writable conversions again. > SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for > Text-Keyed SequenceFiles > ------------------------------------------------------------------------------------------------- > > Key: MAHOUT-1615 > URL: https://issues.apache.org/jira/browse/MAHOUT-1615 > Project: Mahout > Issue Type: Bug > Reporter: Andrew Palumbo > Fix For: 1.0 > > > When reading in seq2sparse output from HDFS in the spark-shell of form > <Text,VectorWriteable> SparkEngine's drmFromHDFS method is creating rdds > with the same Key for all Pairs: > {code} > mahout> val drmTFIDF= drmFromHDFS( path = > "/tmp/mahout-work-andy/20news-test-vectors/part-r-00000") > {code} > Has keys: > {...} > key: /talk.religion.misc/84570 > key: /talk.religion.misc/84570 > key: /talk.religion.misc/84570 > {...} > for the entire set. This is the last Key in the set. > The problem can be traced to the first line of drmFromHDFS(...) in > SparkEngine.scala: > {code} > val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], > minPartitions = parMin) > // Get rid of VectorWritable > .map(t => (t._1, t._2.get())) > {code} > which gives the same key for all t._1. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)