Christophe Bismuth commented on SPARK-1018:


I've spent few hours trying to understand why I had *only duplicates of my last 
RDD item* after calling the {{collect}} API.

I'm using Apache Spark 1.6.0 with Avro files stored in HDFS. Here is my 
workaround, hope it helps ...

public JavaRDD<GenericRecord> readJavaRDD(final JavaSparkContext sparkContext,
                                          final Schema schema,
                                          final String path) throws IOException 
    final Configuration configuration = new Configuration();
    configuration.set("avro.schema.input.key", schema.toString());

    final JavaPairRDD<AvroKey<GenericRecord>, NullWritable> rdd = 
        classHelper.classOf(new AvroKeyInputFormat<GenericRecord>()),
        classHelper.classOf(new AvroKey<GenericRecord>()),

    return rdd.map(tuple -> tuple._1().datum())
              // see the trick below - a deep copy ain't required
              .map(record -> new GenericData.Record((GenericData.Record) 
record, false));


> take and collect don't work on HadoopRDD
> ----------------------------------------
>                 Key: SPARK-1018
>                 URL: https://issues.apache.org/jira/browse/SPARK-1018
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.8.1
>            Reporter: Diana Carroll
>              Labels: hadoop
> I am reading a simple text file using hadoopFile as follows:
> var hrdd1 = 
> sc.hadoopFile("/home/training/testdata.txt",classOf[TextInputFormat], 
> classOf[LongWritable], classOf[Text])
> Testing using this simple text file:
> 001 this is line 1
> 002 this is line two
> 003 yet another line
> the data read is correct, as I can tell using println 
> scala> hrdd1.foreach(println):
> (0,001 this is line 1)
> (19,002 this is line two)
> (40,003 yet another line)
> But neither collect nor take work properly.  Take prints out the key (byte 
> offset) of the last (non-existent) line repeatedly:
> scala> hrdd1.take(4):
> res146: Array[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] 
> = Array((61,), (61,), (61,))
> Collect is even worse: it complains:
> java.io.NotSerializableException: org.apache.hadoop.io.LongWritable at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> The problem appears to be the LongWritable in both cases, because if I map to 
> a new RDD, converting the values from Text objects to strings, it works:
> scala> hrdd1.map(pair => (pair._1.toString,pair._2.toString)).take(4)
> res148: Array[(java.lang.String, java.lang.String)] = Array((0,001 this is 
> line 1), (19,002 this is line two), (40,003 yet another line))
> Seems to me either rdd.collect and rdd.take ought to handle non-serializable 
> types gracefully, or hadoopFile should return a mapped RDD that converts the 
> hadoop types into the appropriate serializable Java objects.  (Or at very 
> least the docs for the API should indicate that the usual RDD methods don't 
> work on HadoopRDDs).
> BTW, this behavior is the same for both the old and new API versions of 
> hadoopFile.  It also is the same whether the file is from HDFS or a plain old 
> text file.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to