RE: Problem understanding spark word count execution

2015-10-02 Thread java8964
No problem.
>From the mapper side, Spark is very similar as the MapReduce; but on the 
>reducer fetching side, MR uses sort merge vs Spark uses HashMap.
So keep this in mind that you can get data automatically sorted on the reducer 
side on MR, but not in Spark.
Spark's performance comes:Cache ability and smart arranging the tasks into 
stages. Intermediate data between stages never stored in HDFS, but in local 
disk. In MR jobs, from one MR job to another one, the intermediate data stored 
in HDFS.Spark uses threads to run tasks, instead of heavy process as MR.
Without caching, in my experience, Spark can get about 2x to 5x better than MR 
job, depending on the jog logic. If the data volume is small, Spark will be 
even better, as the processor is way more expensive than the thread in this 
case.
I didn't see your Spark script, so my guess is that you are using 
"rdd.collect()", which will transfer the final result to driver and dump it in 
the console.
Yong
Date: Fri, 2 Oct 2015 00:50:24 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: java8...@hotmail.com
CC: nicolae.maras...@adswizz.com; user@spark.apache.org

Thanks Yong , 
That was a good explanation I was looking for , however I have one doubt , you 
write - "Image that you have 2 mappers to read the data, then each mapper will 
generate the (word, count) tuple output in segments. Spark always output that 
in local file. (In fact, one file with different segments to represent 
different partitions) "  if this is true then spark is very similar to Hadoop 
MapReduce (Disk IO bw phases) , with so many IOs after each stage how does 
spark achieves the performance that it does as compared to map reduce . Another 
doubt is  "The 2000 bytes sent to driver is the final output aggregated on the 
reducers end, and merged back to the driver." , which part of our word count 
code takes care of this part ? And yes there are only 273 distinct words in the 
text so that's not a surprise.
Thanks again,
Hope to get a reply.
--Kartik
On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8...@hotmail.com> wrote:



I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers 
end, and merged back to the driver.
Yong

Date: Thu, 1 Oct 2015 13:33:59 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: nicolae.maras...@adswizz.com
CC: user@spark.apache.org

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such 
that EACH executer works on only the local copy of file part available , in 
this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 partitions 
will be created and each task will run on  1 partition (which is what I am 
seeing in the stages logs) , also i assume it will read the file in a way that 
each executer will have exactly same amount of data. so there shouldn't be any 
shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the 
output I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC 
TimeInput Size / RecordsWrite TimeShuffle Write Size / 
RecordsErrors0440SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 
s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 
10.35.244.112015/09/29 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 
273I have following questions -
1) What exactly is 2.7KB

Re: Problem understanding spark word count execution

2015-10-02 Thread Kartik Mathur
Thanks Yong,

my script is pretty straight forward -

*sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word =>
(word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") *//both paths are
HDFS.

so if for every shuffle write , it always writes to disk , what is the
meaning of these properties -

spark.shuffle.memoryFraction
spark.shuffle.spill

Thanks,
Kartik




On Fri, Oct 2, 2015 at 6:22 AM, java8964 <java8...@hotmail.com> wrote:

> No problem.
>
> From the mapper side, Spark is very similar as the MapReduce; but on the
> reducer fetching side, MR uses sort merge vs Spark uses HashMap.
>
> So keep this in mind that you can get data automatically sorted on the
> reducer side on MR, but not in Spark.
>
> Spark's performance comes:
>
>- Cache ability and smart arranging the tasks into stages.
>- Intermediate data between stages never stored in HDFS, but in local
>disk. In MR jobs, from one MR job to another one, the intermediate data
>stored in HDFS.
>- Spark uses threads to run tasks, instead of heavy process as MR.
>
>
> Without caching, in my experience, Spark can get about 2x to 5x better
> than MR job, depending on the jog logic. If the data volume is small, Spark
> will be even better, as the processor is way more expensive than the thread
> in this case.
>
> I didn't see your Spark script, so my guess is that you are using
> "rdd.collect()", which will transfer the final result to driver and dump it
> in the console.
>
> Yong
>
> --
> Date: Fri, 2 Oct 2015 00:50:24 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: java8...@hotmail.com
> CC: nicolae.maras...@adswizz.com; user@spark.apache.org
>
>
> Thanks Yong ,
>
> That was a good explanation I was looking for , however I have one doubt ,
> you write - *"**Image that you have 2 mappers to read the data, then each
> mapper will generate the (word, count) tuple output in segments. Spark
> always output that in local file. (In fact, one file with different
> segments to represent different partitions) "  *if this is true then
> spark is very similar to Hadoop MapReduce (Disk IO bw phases) , with so
> many IOs after each stage how does spark achieves the performance that it
> does as compared to map reduce . Another doubt is  *"*The 2000 bytes sent
> to driver is the final output aggregated on the reducers end, and merged
> back to the driver."* , *which part of our word count code takes care of
> this part ? And yes there are only 273 distinct words in the text so that's
> not a surprise.
>
> Thanks again,
>
> Hope to get a reply.
>
> --Kartik
>
> On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8...@hotmail.com> wrote:
>
> I am not sure about originally explain of shuffle write.
>
> In the word count example, the shuffle is needed, as Spark has to group by
> the word (ReduceBy is more accurate here). Image that you have 2 mappers to
> read the data, then each mapper will generate the (word, count) tuple
> output in segments. Spark always output that in local file. (In fact, one
> file with different segments to represent different partitions).
>
> As you can image, the output of these segments will be small, as it only
> contains (word, count of word) tuples. After each mapper generates this
> segmented file for different partitions, then the reduce will fetch the
> partitions belonging to itself.
>
> In your job summery, if your source is text file, so your data corresponds
> to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2
> partitions, about 2.5M lines of data of each partition being processed.
>
> The output of each partition is shuffle-writing 2.7K data, which is the
> size of the segment file generated, corresponding to all the unique words
> and their count of this partition. So the size is reasonable, at least for
> me.
>
> The interested number is 273 as shuffle write records. I am not 100% sure
> its meaning. Does it mean that this partition have 273 unique words from
> these 2.5M lines of data? That is kind of low, but I really don't have
> other explaining of its meaning.
>
> If you finally output shows hundreds of unique words, then it is.
>
> The 2000 bytes sent to driver is the final output aggregated on the
> reducers end, and merged back to the driver.
>
> Yong
>
>
> --
> Date: Thu, 1 Oct 2015 13:33:59 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: nicolae.maras...@adswizz.com
> CC: user@spark.apache.org
>
>
> Hi Nicolae,
> Thanks for the re

RE: Problem understanding spark word count execution

2015-10-02 Thread java8964
These parameters in fact control the behavior on reduce side, as in your word 
count example.
The partitions will be fetched by the reducer which being assigned to it. The 
reducer will fetch corresponding partitions from different mappers output, and 
it will process the data based on your logic while fetching them. This memory 
area is a sortBuffer area, and depending on "spark.shuffle.spill" (for memory 
only or memory + disk), Spark will use different implementations (AppendOnlyMap 
and ExternalAppendOnlyMap) to handle it.
The Spark shuffle memoryFraction is to control what fraction of java heap to 
use as the SortBuffer area.
You can find more information in this Jira:
https://issues.apache.org/jira/browse/SPARK-2045
Yong

Date: Fri, 2 Oct 2015 11:55:41 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: java8...@hotmail.com
CC: nicolae.maras...@adswizz.com; user@spark.apache.org

Thanks Yong,
my script is pretty straight forward - 
sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word => 
(word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") //both paths are HDFS.
so if for every shuffle write , it always writes to disk , what is the meaning 
of these properties -
spark.shuffle.memoryFraction
spark.shuffle.spill

Thanks,Kartik











On Fri, Oct 2, 2015 at 6:22 AM, java8964 <java8...@hotmail.com> wrote:



No problem.
>From the mapper side, Spark is very similar as the MapReduce; but on the 
>reducer fetching side, MR uses sort merge vs Spark uses HashMap.
So keep this in mind that you can get data automatically sorted on the reducer 
side on MR, but not in Spark.
Spark's performance comes:Cache ability and smart arranging the tasks into 
stages. Intermediate data between stages never stored in HDFS, but in local 
disk. In MR jobs, from one MR job to another one, the intermediate data stored 
in HDFS.Spark uses threads to run tasks, instead of heavy process as MR.
Without caching, in my experience, Spark can get about 2x to 5x better than MR 
job, depending on the jog logic. If the data volume is small, Spark will be 
even better, as the processor is way more expensive than the thread in this 
case.
I didn't see your Spark script, so my guess is that you are using 
"rdd.collect()", which will transfer the final result to driver and dump it in 
the console.
Yong
Date: Fri, 2 Oct 2015 00:50:24 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: java8...@hotmail.com
CC: nicolae.maras...@adswizz.com; user@spark.apache.org

Thanks Yong , 
That was a good explanation I was looking for , however I have one doubt , you 
write - "Image that you have 2 mappers to read the data, then each mapper will 
generate the (word, count) tuple output in segments. Spark always output that 
in local file. (In fact, one file with different segments to represent 
different partitions) "  if this is true then spark is very similar to Hadoop 
MapReduce (Disk IO bw phases) , with so many IOs after each stage how does 
spark achieves the performance that it does as compared to map reduce . Another 
doubt is  "The 2000 bytes sent to driver is the final output aggregated on the 
reducers end, and merged back to the driver." , which part of our word count 
code takes care of this part ? And yes there are only 273 distinct words in the 
text so that's not a surprise.
Thanks again,
Hope to get a reply.
--Kartik
On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8...@hotmail.com> wrote:



I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
I

Re: Problem understanding spark word count execution

2015-10-02 Thread Kartik Mathur
Thanks Yong ,

That was a good explanation I was looking for , however I have one doubt ,
you write - *"**Image that you have 2 mappers to read the data, then each
mapper will generate the (word, count) tuple output in segments. Spark
always output that in local file. (In fact, one file with different
segments to represent different partitions) "  *if this is true then spark
is very similar to Hadoop MapReduce (Disk IO bw phases) , with so many IOs
after each stage how does spark achieves the performance that it does as
compared to map reduce . Another doubt is  *"*The 2000 bytes sent to driver
is the final output aggregated on the reducers end, and merged back to the
driver."* , *which part of our word count code takes care of this part ?
And yes there are only 273 distinct words in the text so that's not a
surprise.

Thanks again,

Hope to get a reply.

--Kartik

On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8...@hotmail.com> wrote:

> I am not sure about originally explain of shuffle write.
>
> In the word count example, the shuffle is needed, as Spark has to group by
> the word (ReduceBy is more accurate here). Image that you have 2 mappers to
> read the data, then each mapper will generate the (word, count) tuple
> output in segments. Spark always output that in local file. (In fact, one
> file with different segments to represent different partitions).
>
> As you can image, the output of these segments will be small, as it only
> contains (word, count of word) tuples. After each mapper generates this
> segmented file for different partitions, then the reduce will fetch the
> partitions belonging to itself.
>
> In your job summery, if your source is text file, so your data corresponds
> to 2 HDFS block, or 2x256M. There are 2 tasks concurrent read these 2
> partitions, about 2.5M lines of data of each partition being processed.
>
> The output of each partition is shuffle-writing 2.7K data, which is the
> size of the segment file generated, corresponding to all the unique words
> and their count of this partition. So the size is reasonable, at least for
> me.
>
> The interested number is 273 as shuffle write records. I am not 100% sure
> its meaning. Does it mean that this partition have 273 unique words from
> these 2.5M lines of data? That is kind of low, but I really don't have
> other explaining of its meaning.
>
> If you finally output shows hundreds of unique words, then it is.
>
> The 2000 bytes sent to driver is the final output aggregated on the
> reducers end, and merged back to the driver.
>
> Yong
>
>
> --
> Date: Thu, 1 Oct 2015 13:33:59 -0700
> Subject: Re: Problem understanding spark word count execution
> From: kar...@bluedata.com
> To: nicolae.maras...@adswizz.com
> CC: user@spark.apache.org
>
>
> Hi Nicolae,
> Thanks for the reply. To further clarify things -
>
> sc.textFile is reading from HDFS, now shouldn't the file be read in a way
> such that EACH executer works on only the local copy of file part available
> , in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
> partitions will be created and each task will run on  1 partition (which is
> what I am seeing in the stages logs) , also i assume it will read the file
> in a way that each executer will have exactly same amount of data. so there
> shouldn't be any shuffling in reading atleast.
>
> During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
> the output I am seeing
>
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
> SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
> (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
> 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
> I have following questions -
>
> 1) What exactly is 2.7KB of shuffle write  ?
> 2) is this 2.7 KB of shuffle write is local to that executer ?
> 3) In the executers log I am seeing 2000 bytes results sent to the driver
> , if instead this number is much much greater than 2000 byes such that it
> does not fit in executer's memory , will shuffle write increase ?
> 4)For word count 256 MB data is substantial amount text , how come the
> result for this stage is only 2000 bytes !! it should send everyword with
> respective count , for a 256 MB input this result should be much bigger ?
>
> I hope I am clear this time.
>
> Hope to get a reply,
>
> Thanks
> Kartik
>
>
>
> On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *

Re: Problem understanding spark word count execution

2015-10-01 Thread Kartik Mathur
Thanks Nicolae ,
So In my case all executers are sending results back to the driver and and "
*shuffle* *is just sending out the textFile to distribute the
partitions", *could
you please elaborate on this  ? what exactly is in this file ?

On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

>
> Hi,
>
> 2- the end results are sent back to the driver; the shuffles are
> transmission of intermediate results between nodes such as the -> which are
> all intermediate transformations.
>
> More precisely, since flatMap and map are narrow dependencies, meaning
> they can usually happen on the local node, I bet shuffle is just sending
> out the textFile to a few nodes to distribute the partitions.
>
>
> --
> *From:* Kartik Mathur 
> *Sent:* Thursday, October 1, 2015 12:42 AM
> *To:* user
> *Subject:* Problem understanding spark word count execution
>
> Hi All,
>
> I tried running spark word count and I have couple of questions -
>
> I am analyzing stage 0 , i.e
>  *sc.textFile -> flatMap -> Map (Word count example)*
>
> 1) In the *Stage logs* under Application UI details for every task I am
> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
> this task write ? like how many bytes to which executer ?*
>
> 2) In the executer's log when I look for same task it says 2000 bytes of
> result is sent to driver , my question is , *if the results were directly
> sent to driver what is this shuffle write ? *
>
> Thanks,
> Kartik
>


Re: Problem understanding spark word count execution

2015-10-01 Thread Kartik Mathur
Hi Nicolae,
Thanks for the reply. To further clarify things -

sc.textFile is reading from HDFS, now shouldn't the file be read in a way
such that EACH executer works on only the local copy of file part available
, in this case its a ~ 4.64 GB file and block size is 256MB, so approx 19
partitions will be created and each task will run on  1 partition (which is
what I am seeing in the stages logs) , also i assume it will read the file
in a way that each executer will have exactly same amount of data. so there
shouldn't be any shuffling in reading atleast.

During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is
the output I am seeing

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors0440
SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 s256.0 MB
(hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273
I have following questions -

1) What exactly is 2.7KB of shuffle write  ?
2) is this 2.7 KB of shuffle write is local to that executer ?
3) In the executers log I am seeing 2000 bytes results sent to the driver ,
if instead this number is much much greater than 2000 byes such that it
does not fit in executer's memory , will shuffle write increase ?
4)For word count 256 MB data is substantial amount text , how come the
result for this stage is only 2000 bytes !! it should send everyword with
respective count , for a 256 MB input this result should be much bigger ?

I hope I am clear this time.

Hope to get a reply,

Thanks
Kartik



On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
> So you say " *sc.textFile -> flatMap -> Map".*
>
> *My understanding is like this:*
> *First step is a number of partitions are determined, p of them. You can
> give hint on this.*
> *Then the nodes which will load partitions p, that is n nodes (where
> n<=p).*
>
> *Relatively at the same time or not, the n nodes start opening different
> sections of the file - the physical equivalent of the partitions: for
> instance in HDFS they would do an open and a seek I guess and just read
> from the stream there, convert to whatever the InputFormat dictates.*
>
> The shuffle can only be the part when a node opens an HDFS file for
> instance but the node does not have a local replica of the blocks which it
> needs to read (those pertaining to his assigned partitions). So he needs to
> pick them up from remote nodes which do have replicas of that data.
>
> After blocks are read into memory, flatMap and Map are local computations
> generating new RDDs and in the end the result is sent to the driver
> (whatever termination computation does on the RDD like the result of
> reduce, or side effects of rdd.foreach, etc).
>
> Maybe you can share more of your context if still unclear.
> I just made assumptions to give clarity on a similar thing.
>
> Nicu
> --
> *From:* Kartik Mathur <kar...@bluedata.com>
> *Sent:* Thursday, October 1, 2015 10:25 PM
> *To:* Nicolae Marasoiu
> *Cc:* user
> *Subject:* Re: Problem understanding spark word count execution
>
> Thanks Nicolae ,
> So In my case all executers are sending results back to the driver and and
> "*shuffle* *is just sending out the textFile to distribute the
> partitions", *could you please elaborate on this  ? what exactly is in
> this file ?
>
> On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>>
>> Hi,
>>
>> 2- the end results are sent back to the driver; the shuffles are
>> transmission of intermediate results between nodes such as the -> which are
>> all intermediate transformations.
>>
>> More precisely, since flatMap and map are narrow dependencies, meaning
>> they can usually happen on the local node, I bet shuffle is just sending
>> out the textFile to a few nodes to distribute the partitions.
>>
>>
>> --
>> *From:* Kartik Mathur <kar...@bluedata.com>
>> *Sent:* Thursday, October 1, 2015 12:42 AM
>> *To:* user
>> *Subject:* Problem understanding spark word count execution
>>
>> Hi All,
>>
>> I tried running spark word count and I have couple of questions -
>>
>> I am analyzing stage 0 , i.e
>>  *sc.textFile -> flatMap -> Map (Word count example)*
>>
>> 1) In the *Stage logs* under Application UI details for every task I am
>> seeing Shuffle write as 2.7 KB, *question - how can I know where all did
>> this task write ? like how many bytes to which executer ?*
>>
>> 2) In the executer's log when I look for same task it says 2000 bytes of
>> result is sent to driver , my question is , *if the results were
>> directly sent to driver what is this shuffle write ? *
>>
>> Thanks,
>> Kartik
>>
>
>


Re: Problem understanding spark word count execution

2015-10-01 Thread Nicolae Marasoiu
Hi,

So you say " sc.textFile -> flatMap -> Map".

My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).

Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the partitions: for instance 
in HDFS they would do an open and a seek I guess and just read from the stream 
there, convert to whatever the InputFormat dictates.

The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
remote nodes which do have replicas of that data.

After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).

Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.

Nicu

From: Kartik Mathur <kar...@bluedata.com>
Sent: Thursday, October 1, 2015 10:25 PM
To: Nicolae Marasoiu
Cc: user
Subject: Re: Problem understanding spark word count execution

Thanks Nicolae ,
So In my case all executers are sending results back to the driver and and 
"shuffle is just sending out the textFile to distribute the partitions", could 
you please elaborate on this  ? what exactly is in this file ?


On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu 
<nicolae.maras...@adswizz.com<mailto:nicolae.maras...@adswizz.com>> wrote:


Hi,

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.



From: Kartik Mathur <kar...@bluedata.com<mailto:kar...@bluedata.com>>
Sent: Thursday, October 1, 2015 12:42 AM
To: user
Subject: Problem understanding spark word count execution

Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB, question - how can I know where all did this task 
write ? like how many bytes to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is , if the results were directly sent to 
driver what is this shuffle write ?

Thanks,
Kartik



RE: Problem understanding spark word count execution

2015-10-01 Thread java8964
I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers 
end, and merged back to the driver.
Yong

Date: Thu, 1 Oct 2015 13:33:59 -0700
Subject: Re: Problem understanding spark word count execution
From: kar...@bluedata.com
To: nicolae.maras...@adswizz.com
CC: user@spark.apache.org

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such 
that EACH executer works on only the local copy of file part available , in 
this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 partitions 
will be created and each task will run on  1 partition (which is what I am 
seeing in the stages logs) , also i assume it will read the file in a way that 
each executer will have exactly same amount of data. so there shouldn't be any 
shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the 
output I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC 
TimeInput Size / RecordsWrite TimeShuffle Write Size / 
RecordsErrors0440SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29 13:57:2414 s0.2 
s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 
10.35.244.112015/09/29 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 
273I have following questions -
1) What exactly is 2.7KB of shuffle write  ?2) is this 2.7 KB of shuffle write 
is local to that executer ?3) In the executers log I am seeing 2000 bytes 
results sent to the driver , if instead this number is much much greater than 
2000 byes such that it does not fit in executer's memory , will shuffle write 
increase ?4)For word count 256 MB data is substantial amount text , how come 
the result for this stage is only 2000 bytes !! it should send everyword with 
respective count , for a 256 MB input this result should be much bigger ? 
I hope I am clear this time.
Hope to get a reply,
ThanksKartik


On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu 
<nicolae.maras...@adswizz.com> wrote:







Hi,




So you say " sc.textFile
-> flatMap -> Map".



My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).



Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the
partitions: for instance in HDFS they would do an open and a seek I guess and 
just read from the stream there, convert to whatever the InputFormat dictates.


The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
remote
nodes which do have replicas of that data.



After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).



Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.



Nicu



From: Kartik Mathur <kar...@bluedata.com>

Sent: Thursday, October 1, 2015 10:25 PM

To: Nicolae Marasoiu

Cc: user

Subject: Re: Problem understanding spark word count execution
 


Thanks Nicolae , 
So In my case all executers a

Re: Problem understanding spark word count execution

2015-09-30 Thread Nicolae Marasoiu

Hi,

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 
transformations.

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.



From: Kartik Mathur 
Sent: Thursday, October 1, 2015 12:42 AM
To: user
Subject: Problem understanding spark word count execution

Hi All,

I tried running spark word count and I have couple of questions -

I am analyzing stage 0 , i.e
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB, question - how can I know where all did this task 
write ? like how many bytes to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is , if the results were directly sent to 
driver what is this shuffle write ?

Thanks,
Kartik