Re: strange HashPartitioner behavior in Spark

2016-04-18 Thread Raghava Mutharaju
No. We specify it as a configuration option to the spark-submit. Does that
make a difference?

Regards,
Raghava.


On Mon, Apr 18, 2016 at 9:56 AM, Sonal Goyal  wrote:

> Are you specifying your spark master in the scala program?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies 
> Reifier at Strata Hadoop World
> 
> Reifier at Spark Summit 2015
> 
>
> 
>
>
>
> On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Mike,
>>
>> We tried that. This map task is actually part of a larger set of
>> operations. I pointed out this map task since it involves partitionBy() and
>> we always use partitionBy() whenever partition-unaware shuffle operations
>> are performed (such as distinct). We in fact do not notice a change in the
>> distribution after several unrelated stages are executed and a significant
>> time has passed (nearly 10-15 minutes).
>>
>> I agree. We are not looking for partitions to go to specific nodes and
>> nor do we expect a uniform distribution of keys across the cluster. There
>> will be a skew. But it cannot be that all the data is on one node and
>> nothing on the other and no, the keys are not the same. They vary from 1 to
>> around 55000 (integers). What makes this strange is that it seems to work
>> fine on the spark shell (REPL).
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> A HashPartitioner will indeed partition based on the key, but you
>>> cannot know on *which* node that key will appear. Again, the RDD
>>> partitions will not necessarily be distributed evenly across your
>>> nodes because of the greedy scheduling of the first wave of tasks,
>>> particularly if those tasks have durations less than the initial
>>> executor delay. I recommend you look at your logs to verify if this is
>>> happening to you.
>>>
>>> Mike
>>>
>>> On 4/18/16, Anuj Kumar  wrote:
>>> > Good point Mike +1
>>> >
>>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>>> >
>>> >> When submitting a job with spark-submit, I've observed delays (up to
>>> >> 1--2 seconds) for the executors to respond to the driver in order to
>>> >> receive tasks in the first stage. The delay does not persist once the
>>> >> executors have been synchronized.
>>> >>
>>> >> When the tasks are very short, as may be your case (relatively small
>>> >> data and a simple map task like you have described), the 8 tasks in
>>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> >> the second executor won't have responded to the master before the
>>> >> first 4 tasks on the first executor have completed.
>>> >>
>>> >> To see if this is the cause in your particular case, you could try the
>>> >> following to confirm:
>>> >> 1. Examine the starting times of the tasks alongside their
>>> >> executor
>>> >> 2. Make a "dummy" stage execute before your real stages to
>>> >> synchronize the executors by creating and materializing any random RDD
>>> >> 3. Make the tasks longer, i.e. with some silly computational
>>> >> work.
>>> >>
>>> >> Mike
>>> >>
>>> >>
>>> >> On 4/17/16, Raghava Mutharaju  wrote:
>>> >> > Yes its the same data.
>>> >> >
>>> >> > 1) The number of partitions are the same (8, which is an argument to
>>> >> > the
>>> >> > HashPartitioner). In the first case, these partitions are spread
>>> across
>>> >> > both the worker nodes. In the second case, all the partitions are on
>>> >> > the
>>> >> > same node.
>>> >> > 2) What resources would be of interest here? Scala shell takes the
>>> >> default
>>> >> > parameters since we use "bin/spark-shell --master " to
>>> run
>>> >> the
>>> >> > scala-shell. For the scala program, we do set some configuration
>>> >> > options
>>> >> > such as driver memory (12GB), parallelism is set to 8 and we use
>>> Kryo
>>> >> > serializer.
>>> >> >
>>> >> > We are running this on Azure D3-v2 machines which have 4 cores and
>>> 14GB
>>> >> > RAM.1 executor runs on each worker node. Following configuration
>>> >> > options
>>> >> > are set for the scala program -- perhaps we should move it to the
>>> spark
>>> >> > config file.
>>> >> >
>>> >> > Driver memory and executor memory are set to 12GB
>>> >> > parallelism is set to 8
>>> >> > Kryo serializer is used
>>> >> > Number of retainedJobs and retainedStages has been increased to
>>> check
>>> >> them
>>> >> > in the UI.
>>> >> >
>>> >> > What information regarding Spark Context would be of interest here?
>>> >> >
>>> >> > Regards,
>>> >> > Raghava.
>>> >> >
>>> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>>> >> > wrote:
>>> >> >
>>> >> >> If the data file is same then it should have 

Re: strange HashPartitioner behavior in Spark

2016-04-18 Thread Sonal Goyal
Are you specifying your spark master in the scala program?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Mike,
>
> We tried that. This map task is actually part of a larger set of
> operations. I pointed out this map task since it involves partitionBy() and
> we always use partitionBy() whenever partition-unaware shuffle operations
> are performed (such as distinct). We in fact do not notice a change in the
> distribution after several unrelated stages are executed and a significant
> time has passed (nearly 10-15 minutes).
>
> I agree. We are not looking for partitions to go to specific nodes and nor
> do we expect a uniform distribution of keys across the cluster. There will
> be a skew. But it cannot be that all the data is on one node and nothing on
> the other and no, the keys are not the same. They vary from 1 to around
> 55000 (integers). What makes this strange is that it seems to work fine on
> the spark shell (REPL).
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> A HashPartitioner will indeed partition based on the key, but you
>> cannot know on *which* node that key will appear. Again, the RDD
>> partitions will not necessarily be distributed evenly across your
>> nodes because of the greedy scheduling of the first wave of tasks,
>> particularly if those tasks have durations less than the initial
>> executor delay. I recommend you look at your logs to verify if this is
>> happening to you.
>>
>> Mike
>>
>> On 4/18/16, Anuj Kumar  wrote:
>> > Good point Mike +1
>> >
>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> When submitting a job with spark-submit, I've observed delays (up to
>> >> 1--2 seconds) for the executors to respond to the driver in order to
>> >> receive tasks in the first stage. The delay does not persist once the
>> >> executors have been synchronized.
>> >>
>> >> When the tasks are very short, as may be your case (relatively small
>> >> data and a simple map task like you have described), the 8 tasks in
>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> >> the second executor won't have responded to the master before the
>> >> first 4 tasks on the first executor have completed.
>> >>
>> >> To see if this is the cause in your particular case, you could try the
>> >> following to confirm:
>> >> 1. Examine the starting times of the tasks alongside their
>> >> executor
>> >> 2. Make a "dummy" stage execute before your real stages to
>> >> synchronize the executors by creating and materializing any random RDD
>> >> 3. Make the tasks longer, i.e. with some silly computational
>> >> work.
>> >>
>> >> Mike
>> >>
>> >>
>> >> On 4/17/16, Raghava Mutharaju  wrote:
>> >> > Yes its the same data.
>> >> >
>> >> > 1) The number of partitions are the same (8, which is an argument to
>> >> > the
>> >> > HashPartitioner). In the first case, these partitions are spread
>> across
>> >> > both the worker nodes. In the second case, all the partitions are on
>> >> > the
>> >> > same node.
>> >> > 2) What resources would be of interest here? Scala shell takes the
>> >> default
>> >> > parameters since we use "bin/spark-shell --master " to
>> run
>> >> the
>> >> > scala-shell. For the scala program, we do set some configuration
>> >> > options
>> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> >> > serializer.
>> >> >
>> >> > We are running this on Azure D3-v2 machines which have 4 cores and
>> 14GB
>> >> > RAM.1 executor runs on each worker node. Following configuration
>> >> > options
>> >> > are set for the scala program -- perhaps we should move it to the
>> spark
>> >> > config file.
>> >> >
>> >> > Driver memory and executor memory are set to 12GB
>> >> > parallelism is set to 8
>> >> > Kryo serializer is used
>> >> > Number of retainedJobs and retainedStages has been increased to check
>> >> them
>> >> > in the UI.
>> >> >
>> >> > What information regarding Spark Context would be of interest here?
>> >> >
>> >> > Regards,
>> >> > Raghava.
>> >> >
>> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> >> > wrote:
>> >> >
>> >> >> If the data file is same then it should have similar distribution of
>> >> >> keys.
>> >> >> Few queries-
>> >> >>
>> >> >> 1. Did you compare the number of partitions in both the cases?
>> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> >> Program being submitted?
>> >> >>
>> >> >> Also, can you please share the details of Spark Context, 

Re: strange HashPartitioner behavior in Spark

2016-04-18 Thread Raghava Mutharaju
Mike,

We tried that. This map task is actually part of a larger set of
operations. I pointed out this map task since it involves partitionBy() and
we always use partitionBy() whenever partition-unaware shuffle operations
are performed (such as distinct). We in fact do not notice a change in the
distribution after several unrelated stages are executed and a significant
time has passed (nearly 10-15 minutes).

I agree. We are not looking for partitions to go to specific nodes and nor
do we expect a uniform distribution of keys across the cluster. There will
be a skew. But it cannot be that all the data is on one node and nothing on
the other and no, the keys are not the same. They vary from 1 to around
55000 (integers). What makes this strange is that it seems to work fine on
the spark shell (REPL).

Regards,
Raghava.


On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:

> A HashPartitioner will indeed partition based on the key, but you
> cannot know on *which* node that key will appear. Again, the RDD
> partitions will not necessarily be distributed evenly across your
> nodes because of the greedy scheduling of the first wave of tasks,
> particularly if those tasks have durations less than the initial
> executor delay. I recommend you look at your logs to verify if this is
> happening to you.
>
> Mike
>
> On 4/18/16, Anuj Kumar  wrote:
> > Good point Mike +1
> >
> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
> >
> >> When submitting a job with spark-submit, I've observed delays (up to
> >> 1--2 seconds) for the executors to respond to the driver in order to
> >> receive tasks in the first stage. The delay does not persist once the
> >> executors have been synchronized.
> >>
> >> When the tasks are very short, as may be your case (relatively small
> >> data and a simple map task like you have described), the 8 tasks in
> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
> >> the second executor won't have responded to the master before the
> >> first 4 tasks on the first executor have completed.
> >>
> >> To see if this is the cause in your particular case, you could try the
> >> following to confirm:
> >> 1. Examine the starting times of the tasks alongside their
> >> executor
> >> 2. Make a "dummy" stage execute before your real stages to
> >> synchronize the executors by creating and materializing any random RDD
> >> 3. Make the tasks longer, i.e. with some silly computational
> >> work.
> >>
> >> Mike
> >>
> >>
> >> On 4/17/16, Raghava Mutharaju  wrote:
> >> > Yes its the same data.
> >> >
> >> > 1) The number of partitions are the same (8, which is an argument to
> >> > the
> >> > HashPartitioner). In the first case, these partitions are spread
> across
> >> > both the worker nodes. In the second case, all the partitions are on
> >> > the
> >> > same node.
> >> > 2) What resources would be of interest here? Scala shell takes the
> >> default
> >> > parameters since we use "bin/spark-shell --master " to run
> >> the
> >> > scala-shell. For the scala program, we do set some configuration
> >> > options
> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> >> > serializer.
> >> >
> >> > We are running this on Azure D3-v2 machines which have 4 cores and
> 14GB
> >> > RAM.1 executor runs on each worker node. Following configuration
> >> > options
> >> > are set for the scala program -- perhaps we should move it to the
> spark
> >> > config file.
> >> >
> >> > Driver memory and executor memory are set to 12GB
> >> > parallelism is set to 8
> >> > Kryo serializer is used
> >> > Number of retainedJobs and retainedStages has been increased to check
> >> them
> >> > in the UI.
> >> >
> >> > What information regarding Spark Context would be of interest here?
> >> >
> >> > Regards,
> >> > Raghava.
> >> >
> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
> >> > wrote:
> >> >
> >> >> If the data file is same then it should have similar distribution of
> >> >> keys.
> >> >> Few queries-
> >> >>
> >> >> 1. Did you compare the number of partitions in both the cases?
> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
> >> >> Program being submitted?
> >> >>
> >> >> Also, can you please share the details of Spark Context, Environment
> >> >> and
> >> >> Executors when you run via Scala program?
> >> >>
> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >> >> m.vijayaragh...@gmail.com> wrote:
> >> >>
> >> >>> Hello All,
> >> >>>
> >> >>> We are using HashPartitioner in the following way on a 3 node
> cluster
> >> (1
> >> >>> master and 2 worker nodes).
> >> >>>
> >> >>> val u =
> >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
> >> >>> (y.toInt,
> >> >>> x.toInt) } }).partitionBy(new
> >> 

Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Mike Hynes
A HashPartitioner will indeed partition based on the key, but you
cannot know on *which* node that key will appear. Again, the RDD
partitions will not necessarily be distributed evenly across your
nodes because of the greedy scheduling of the first wave of tasks,
particularly if those tasks have durations less than the initial
executor delay. I recommend you look at your logs to verify if this is
happening to you.

Mike

On 4/18/16, Anuj Kumar  wrote:
> Good point Mike +1
>
> On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>> 1. Examine the starting times of the tasks alongside their
>> executor
>> 2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>> 3. Make the tasks longer, i.e. with some silly computational
>> work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju  wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to
>> > the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on
>> > the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master " to run
>> the
>> > scala-shell. For the scala program, we do set some configuration
>> > options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration
>> > options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> > wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> >> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaragh...@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>> >>> (y.toInt,
>> >>> x.toInt) } }).partitionBy(new
>> HashPartitioner(8)).setName("u").persist()
>> >>>
>> >>> u.count()
>> >>>
>> >>> If we run this from the spark shell, the data (52 MB) is split across
>> >>> the
>> >>> two worker nodes. But if we put this in a scala program and run it,
>> then
>> >>> all the data goes to only one node. We have run it multiple times,
>> >>> but
>> >>> this
>> >>> behavior does not change. This seems strange.
>> >>>
>> >>> Is there some problem with the way we use HashPartitioner?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> Regards,
>> >>> Raghava.
>> >>>
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Raghava
>> > http://raghavam.github.io
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Anuj Kumar
Good point Mike +1

On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:

> When submitting a job with spark-submit, I've observed delays (up to
> 1--2 seconds) for the executors to respond to the driver in order to
> receive tasks in the first stage. The delay does not persist once the
> executors have been synchronized.
>
> When the tasks are very short, as may be your case (relatively small
> data and a simple map task like you have described), the 8 tasks in
> your stage may be allocated to only 1 executor in 2 waves of 4, since
> the second executor won't have responded to the master before the
> first 4 tasks on the first executor have completed.
>
> To see if this is the cause in your particular case, you could try the
> following to confirm:
> 1. Examine the starting times of the tasks alongside their executor
> 2. Make a "dummy" stage execute before your real stages to
> synchronize the executors by creating and materializing any random RDD
> 3. Make the tasks longer, i.e. with some silly computational work.
>
> Mike
>
>
> On 4/17/16, Raghava Mutharaju  wrote:
> > Yes its the same data.
> >
> > 1) The number of partitions are the same (8, which is an argument to the
> > HashPartitioner). In the first case, these partitions are spread across
> > both the worker nodes. In the second case, all the partitions are on the
> > same node.
> > 2) What resources would be of interest here? Scala shell takes the
> default
> > parameters since we use "bin/spark-shell --master " to run
> the
> > scala-shell. For the scala program, we do set some configuration options
> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> > serializer.
> >
> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> > RAM.1 executor runs on each worker node. Following configuration options
> > are set for the scala program -- perhaps we should move it to the spark
> > config file.
> >
> > Driver memory and executor memory are set to 12GB
> > parallelism is set to 8
> > Kryo serializer is used
> > Number of retainedJobs and retainedStages has been increased to check
> them
> > in the UI.
> >
> > What information regarding Spark Context would be of interest here?
> >
> > Regards,
> > Raghava.
> >
> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
> >
> >> If the data file is same then it should have similar distribution of
> >> keys.
> >> Few queries-
> >>
> >> 1. Did you compare the number of partitions in both the cases?
> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
> >> Program being submitted?
> >>
> >> Also, can you please share the details of Spark Context, Environment and
> >> Executors when you run via Scala program?
> >>
> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >> m.vijayaragh...@gmail.com> wrote:
> >>
> >>> Hello All,
> >>>
> >>> We are using HashPartitioner in the following way on a 3 node cluster
> (1
> >>> master and 2 worker nodes).
> >>>
> >>> val u =
> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
> >>> x.toInt) } }).partitionBy(new
> HashPartitioner(8)).setName("u").persist()
> >>>
> >>> u.count()
> >>>
> >>> If we run this from the spark shell, the data (52 MB) is split across
> >>> the
> >>> two worker nodes. But if we put this in a scala program and run it,
> then
> >>> all the data goes to only one node. We have run it multiple times, but
> >>> this
> >>> behavior does not change. This seems strange.
> >>>
> >>> Is there some problem with the way we use HashPartitioner?
> >>>
> >>> Thanks in advance.
> >>>
> >>> Regards,
> >>> Raghava.
> >>>
> >>
> >>
> >
> >
> > --
> > Regards,
> > Raghava
> > http://raghavam.github.io
> >
>
>
> --
> Thanks,
> Mike
>


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Raghava Mutharaju
We are testing with 52MB, but it would go to 20GB and more later on. The
cluster size is also not static, we would be growing it. But the issue here
is the behavior of HashPartitioner -- from what I understand, it should be
partitioning the data based on the hash of the key irrespective of the RAM
size (which is more than adequate now). This behavior is different in
spark-shell and spark scala program.

We are not using YARN, its the stand alone version of Spark.

Regards,
Raghava.


On Mon, Apr 18, 2016 at 12:09 AM, Anuj Kumar  wrote:

> Few params like- spark.task.cpus, spark.cores.max will help. Also, for
> 52MB of data you need not have 12GB allocated to executors. Better to
> assign 512MB or so and increase the number of executors per worker node.
> Try reducing that executor memory to 512MB or so for this case.
>
> On Mon, Apr 18, 2016 at 9:07 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Yes its the same data.
>>
>> 1) The number of partitions are the same (8, which is an argument to the
>> HashPartitioner). In the first case, these partitions are spread across
>> both the worker nodes. In the second case, all the partitions are on the
>> same node.
>> 2) What resources would be of interest here? Scala shell takes the
>> default parameters since we use "bin/spark-shell --master " to
>> run the scala-shell. For the scala program, we do set some configuration
>> options such as driver memory (12GB), parallelism is set to 8 and we use
>> Kryo serializer.
>>
>> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> RAM.1 executor runs on each worker node. Following configuration options
>> are set for the scala program -- perhaps we should move it to the spark
>> config file.
>>
>> Driver memory and executor memory are set to 12GB
>> parallelism is set to 8
>> Kryo serializer is used
>> Number of retainedJobs and retainedStages has been increased to check
>> them in the UI.
>>
>> What information regarding Spark Context would be of interest here?
>>
>> Regards,
>> Raghava.
>>
>> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
>>
>>> If the data file is same then it should have similar distribution of
>>> keys. Few queries-
>>>
>>> 1. Did you compare the number of partitions in both the cases?
>>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> Program being submitted?
>>>
>>> Also, can you please share the details of Spark Context, Environment and
>>> Executors when you run via Scala program?
>>>
>>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
 Hello All,

 We are using HashPartitioner in the following way on a 3 node cluster
 (1 master and 2 worker nodes).

 val u =
 sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
 Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
 x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()

 u.count()

 If we run this from the spark shell, the data (52 MB) is split across
 the two worker nodes. But if we put this in a scala program and run it,
 then all the data goes to only one node. We have run it multiple times, but
 this behavior does not change. This seems strange.

 Is there some problem with the way we use HashPartitioner?

 Thanks in advance.

 Regards,
 Raghava.

>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Mike Hynes
When submitting a job with spark-submit, I've observed delays (up to
1--2 seconds) for the executors to respond to the driver in order to
receive tasks in the first stage. The delay does not persist once the
executors have been synchronized.

When the tasks are very short, as may be your case (relatively small
data and a simple map task like you have described), the 8 tasks in
your stage may be allocated to only 1 executor in 2 waves of 4, since
the second executor won't have responded to the master before the
first 4 tasks on the first executor have completed.

To see if this is the cause in your particular case, you could try the
following to confirm:
1. Examine the starting times of the tasks alongside their executor
2. Make a "dummy" stage execute before your real stages to
synchronize the executors by creating and materializing any random RDD
3. Make the tasks longer, i.e. with some silly computational work.

Mike


On 4/17/16, Raghava Mutharaju  wrote:
> Yes its the same data.
>
> 1) The number of partitions are the same (8, which is an argument to the
> HashPartitioner). In the first case, these partitions are spread across
> both the worker nodes. In the second case, all the partitions are on the
> same node.
> 2) What resources would be of interest here? Scala shell takes the default
> parameters since we use "bin/spark-shell --master " to run the
> scala-shell. For the scala program, we do set some configuration options
> such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> serializer.
>
> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> RAM.1 executor runs on each worker node. Following configuration options
> are set for the scala program -- perhaps we should move it to the spark
> config file.
>
> Driver memory and executor memory are set to 12GB
> parallelism is set to 8
> Kryo serializer is used
> Number of retainedJobs and retainedStages has been increased to check them
> in the UI.
>
> What information regarding Spark Context would be of interest here?
>
> Regards,
> Raghava.
>
> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
>
>> If the data file is same then it should have similar distribution of
>> keys.
>> Few queries-
>>
>> 1. Did you compare the number of partitions in both the cases?
>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> Program being submitted?
>>
>> Also, can you please share the details of Spark Context, Environment and
>> Executors when you run via Scala program?
>>
>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We are using HashPartitioner in the following way on a 3 node cluster (1
>>> master and 2 worker nodes).
>>>
>>> val u =
>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>
>>> u.count()
>>>
>>> If we run this from the spark shell, the data (52 MB) is split across
>>> the
>>> two worker nodes. But if we put this in a scala program and run it, then
>>> all the data goes to only one node. We have run it multiple times, but
>>> this
>>> behavior does not change. This seems strange.
>>>
>>> Is there some problem with the way we use HashPartitioner?
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


-- 
Thanks,
Mike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Anuj Kumar
Few params like- spark.task.cpus, spark.cores.max will help. Also, for 52MB
of data you need not have 12GB allocated to executors. Better to assign
512MB or so and increase the number of executors per worker node. Try
reducing that executor memory to 512MB or so for this case.

On Mon, Apr 18, 2016 at 9:07 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Yes its the same data.
>
> 1) The number of partitions are the same (8, which is an argument to the
> HashPartitioner). In the first case, these partitions are spread across
> both the worker nodes. In the second case, all the partitions are on the
> same node.
> 2) What resources would be of interest here? Scala shell takes the default
> parameters since we use "bin/spark-shell --master " to run the
> scala-shell. For the scala program, we do set some configuration options
> such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> serializer.
>
> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> RAM.1 executor runs on each worker node. Following configuration options
> are set for the scala program -- perhaps we should move it to the spark
> config file.
>
> Driver memory and executor memory are set to 12GB
> parallelism is set to 8
> Kryo serializer is used
> Number of retainedJobs and retainedStages has been increased to check them
> in the UI.
>
> What information regarding Spark Context would be of interest here?
>
> Regards,
> Raghava.
>
> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
>
>> If the data file is same then it should have similar distribution of
>> keys. Few queries-
>>
>> 1. Did you compare the number of partitions in both the cases?
>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> Program being submitted?
>>
>> Also, can you please share the details of Spark Context, Environment and
>> Executors when you run via Scala program?
>>
>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We are using HashPartitioner in the following way on a 3 node cluster (1
>>> master and 2 worker nodes).
>>>
>>> val u =
>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>
>>> u.count()
>>>
>>> If we run this from the spark shell, the data (52 MB) is split across
>>> the two worker nodes. But if we put this in a scala program and run it,
>>> then all the data goes to only one node. We have run it multiple times, but
>>> this behavior does not change. This seems strange.
>>>
>>> Is there some problem with the way we use HashPartitioner?
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Raghava Mutharaju
Yes its the same data.

1) The number of partitions are the same (8, which is an argument to the
HashPartitioner). In the first case, these partitions are spread across
both the worker nodes. In the second case, all the partitions are on the
same node.
2) What resources would be of interest here? Scala shell takes the default
parameters since we use "bin/spark-shell --master " to run the
scala-shell. For the scala program, we do set some configuration options
such as driver memory (12GB), parallelism is set to 8 and we use Kryo
serializer.

We are running this on Azure D3-v2 machines which have 4 cores and 14GB
RAM.1 executor runs on each worker node. Following configuration options
are set for the scala program -- perhaps we should move it to the spark
config file.

Driver memory and executor memory are set to 12GB
parallelism is set to 8
Kryo serializer is used
Number of retainedJobs and retainedStages has been increased to check them
in the UI.

What information regarding Spark Context would be of interest here?

Regards,
Raghava.

On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:

> If the data file is same then it should have similar distribution of keys.
> Few queries-
>
> 1. Did you compare the number of partitions in both the cases?
> 2. Did you compare the resource allocation for Spark Shell vs Scala
> Program being submitted?
>
> Also, can you please share the details of Spark Context, Environment and
> Executors when you run via Scala program?
>
> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> We are using HashPartitioner in the following way on a 3 node cluster (1
>> master and 2 worker nodes).
>>
>> val u = sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>
>> u.count()
>>
>> If we run this from the spark shell, the data (52 MB) is split across the
>> two worker nodes. But if we put this in a scala program and run it, then
>> all the data goes to only one node. We have run it multiple times, but this
>> behavior does not change. This seems strange.
>>
>> Is there some problem with the way we use HashPartitioner?
>>
>> Thanks in advance.
>>
>> Regards,
>> Raghava.
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Anuj Kumar
If the data file is same then it should have similar distribution of keys.
Few queries-

1. Did you compare the number of partitions in both the cases?
2. Did you compare the resource allocation for Spark Shell vs Scala Program
being submitted?

Also, can you please share the details of Spark Context, Environment and
Executors when you run via Scala program?

On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Hello All,
>
> We are using HashPartitioner in the following way on a 3 node cluster (1
> master and 2 worker nodes).
>
> val u = sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>
> u.count()
>
> If we run this from the spark shell, the data (52 MB) is split across the
> two worker nodes. But if we put this in a scala program and run it, then
> all the data goes to only one node. We have run it multiple times, but this
> behavior does not change. This seems strange.
>
> Is there some problem with the way we use HashPartitioner?
>
> Thanks in advance.
>
> Regards,
> Raghava.
>