As I understand it, it's down to how Hadoop FileInputFormats work, and
questions of mutability. If you were to read a file from Hadoop via an
InputFormat with a simple Java program, the InputFormat's RecordReader
creates a single, mutable instance of the Writable key class and a single,
mutable instance of the Writable value. When you loop through the records,
the RecordReader reuses those Writable instances by deserializing the
underlying bytes from the file into the instances 1 record at a time. It's
up to the application to then copy whatever's needed out of those Writable
instances into something else if it wants to do something with them.

 

It's exactly the same when using Spark as the application. When you create
an RDD of Writable objects by calling .sequenceFile, the RDD contains many
identical references to the exact same object instance. Therefore, when
Spark does a sort, cache or shuffle, (I believe) it optimizes because it
assumes that objects are immutable. Therefore, the map step is necessary,
because it creates a distinct, immutable copy of each record.

 

This is just an issue with the Hadoop InputFormat class. If you can write a
way of reading files from HDFS that don't use Hadoop's classes (though I'm
not sure why you would, a simple map is far easier), then the map would
potentially be unnecessary.

 

Andrew

 

From: jeff saremi [mailto:jeffsar...@hotmail.com] 
Sent: 19 November 2015 05:35
To: Jeff Zhang <zjf...@gmail.com>
Cc: dev@spark.apache.org
Subject: RE: SequenceFile and object reuse

 

You're not seeing the issue because you perform one additional "map". 

map{case (k,v) => (k.get(), v.toString)}

Instead of being able to use the read Text you had to create a tuple
(single) out of the string of the text.

That is exactly why I asked this question.

Why do we have t do this additional processing? What is the rationale behind
it?
Is there other ways of reading a hadoop file (or any other file) that would
not incur this additional step?

thanks

 

 

  _____  

Date: Thu, 19 Nov 2015 13:26:31 +0800
Subject: Re: FW: SequenceFile and object reuse
From: zjf...@gmail.com <mailto:zjf...@gmail.com> 
To: jeffsar...@hotmail.com <mailto:jeffsar...@hotmail.com> 
CC: dev@spark.apache.org <mailto:dev@spark.apache.org> 

Would this be an issue on the raw data ? I use the following simple code,
and don't hit the issue you mentioned. Or it would be better to share your
code. 

 

val rdd =sc.sequenceFile("/Users/hadoop/Temp/Seq", classOf[IntWritable],
classOf[Text])
rdd.map{case (k,v) => (k.get(), v.toString)}.collect() foreach println

 

On Thu, Nov 19, 2015 at 12:04 PM, jeff saremi <jeffsar...@hotmail.com
<mailto:jeffsar...@hotmail.com> > wrote:

I sent this to the user forum. I got no responses. Could someone here please
help? thanks
jeff

 


  _____  


From: jeffsar...@hotmail.com <mailto:jeffsar...@hotmail.com> 
To: u...@spark.apache.org <mailto:u...@spark.apache.org> 
Subject: SequenceFile and object reuse
Date: Fri, 13 Nov 2015 13:29:58 -0500

 

So we tried reading a sequencefile in Spark and realized that all our
records have ended up becoming the same.
THen one of us found this:

Note: Because Hadoop's RecordReader class re-uses the same Writable object
for each record, directly caching the returned RDD or directly passing it to
an aggregation or shuffle operation will create many references to the same
object. If you plan to directly cache, sort, or aggregate Hadoop writable
objects, you should first copy them using a map function.

Is there anyone that can shed some light on this bizzare behavior and the
decisions behind it?
And I also would like to know if anyone's able to read a binary file and not
to incur the additional map() as suggested by the above? What format did you
use?

thanks

Jeff





 

-- 

Best Regards

Jeff Zhang

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to