Ah, I see where I'm wrong here. What are reused here are the shuffle map output files themselves, rather than the file paths. No new shuffle map output files are generated for the 2nd job. Thanks! Really need to walk through Spark core code again :)

Cheng

On 3/25/15 9:31 PM, Shao, Saisai wrote:
Hi Cheng,

I think your scenario is acceptable for Spark's shuffle mechanism and will not 
occur shuffle file name conflicts.

 From my understanding I think the code snippet you mentioned is the same RDD 
graph, just running twice, these two jobs will generate 3 stages, map stage and 
collect stage for the first job, only collect stage for the second job (map 
stage is the same as previous job). So these two jobs will only generate one 
copy of shuffle files in the first job, and fetch the shuffle data twice for 
each job. So name conflicts will not be occurred, since these two jobs rely on 
the same ShuffledRDD.

I think only shuffle write which generates shuffle files will have chance to 
meet name conflicts, multiple times of shuffle read is acceptable as the code 
snippet shows.

Thanks
Jerry



-----Original Message-----
From: Cheng Lian [mailto:lian.cs....@gmail.com]
Sent: Wednesday, March 25, 2015 7:40 PM
To: Saisai Shao; Kannan Rajah
Cc: dev@spark.apache.org
Subject: Re: Understanding shuffle file name conflicts

Hi Jerry & Josh

It has been a while since the last time I looked into Spark core shuffle code, 
maybe I’m wrong here. But the shuffle ID is created along with 
ShuffleDependency, which is part of the RDD DAG. So if we submit multiple jobs 
over the same RDD DAG, I think the shuffle IDs in these jobs should duplicate. 
For example:

|val  dag  =  sc.parallelize(Array(1,2,3)).map(i => i ->
|i).reduceByKey(_ + _)
dag.collect()
dag.collect()
|

  From the debug log output, I did see duplicated shuffle IDs in both jobs. 
Something like this:

|# Job 1
15/03/25 19:26:34 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 
0, reduce 2

# Job 2
15/03/25 19:26:36 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 
0, reduce 5
|

So it’s also possible that some shuffle output files get reused in different 
jobs. But Kannan, did you submit separate jobs over the same RDD DAG as I did 
above? If not, I’d agree with Jerry and Josh.

(Did I miss something here?)

Cheng

On 3/25/15 10:35 AM, Saisai Shao wrote:

Hi Kannan,

As I know the shuffle Id in ShuffleDependency will be increased, so
even if you run the same job twice, the shuffle dependency as well as
shuffle id is different, so the shuffle file name which is combined by
(shuffleId+mapId+reduceId) will be changed, so there's no name
conflict even in the same directory as I know.

Thanks
Jerry


2015-03-25 1:56 GMT+08:00 Kannan Rajah <kra...@maprtech.com>:

I am working on SPARK-1529. I ran into an issue with my change, where
the same shuffle file was being reused across 2 jobs. Please note
this only happens when I use a hard coded location to use for shuffle
files, say "/tmp". It does not happen with normal code path that uses
DiskBlockManager to pick different directories for each run. So I
want to understand how DiskBlockManager guarantees that such a conflict will 
never happen.

Let's say the shuffle block id has a value of shuffle_0_0_0. So the
data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index.
If I run a spark job twice, one after another, these files get
created under different directories because of the hashing logic in
DiskBlockManager. But the hash is based off the file name, so how are
we sure that there won't be a conflict ever?

--
Kannan



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to