Mike,

We tried that. This map task is actually part of a larger set of
operations. I pointed out this map task since it involves partitionBy() and
we always use partitionBy() whenever partition-unaware shuffle operations
are performed (such as distinct). We in fact do not notice a change in the
distribution after several unrelated stages are executed and a significant
time has passed (nearly 10-15 minutes).

I agree. We are not looking for partitions to go to specific nodes and nor
do we expect a uniform distribution of keys across the cluster. There will
be a skew. But it cannot be that all the data is on one node and nothing on
the other and no, the keys are not the same. They vary from 1 to around
55000 (integers). What makes this strange is that it seems to work fine on
the spark shell (REPL).

Regards,
Raghava.


On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:

> A HashPartitioner will indeed partition based on the key, but you
> cannot know on *which* node that key will appear. Again, the RDD
> partitions will not necessarily be distributed evenly across your
> nodes because of the greedy scheduling of the first wave of tasks,
> particularly if those tasks have durations less than the initial
> executor delay. I recommend you look at your logs to verify if this is
> happening to you.
>
> Mike
>
> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote:
> > Good point Mike +1
> >
> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
> >
> >> When submitting a job with spark-submit, I've observed delays (up to
> >> 1--2 seconds) for the executors to respond to the driver in order to
> >> receive tasks in the first stage. The delay does not persist once the
> >> executors have been synchronized.
> >>
> >> When the tasks are very short, as may be your case (relatively small
> >> data and a simple map task like you have described), the 8 tasks in
> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
> >> the second executor won't have responded to the master before the
> >> first 4 tasks on the first executor have completed.
> >>
> >> To see if this is the cause in your particular case, you could try the
> >> following to confirm:
> >>         1. Examine the starting times of the tasks alongside their
> >> executor
> >>         2. Make a "dummy" stage execute before your real stages to
> >> synchronize the executors by creating and materializing any random RDD
> >>         3. Make the tasks longer, i.e. with some silly computational
> >> work.
> >>
> >> Mike
> >>
> >>
> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
> >> > Yes its the same data.
> >> >
> >> > 1) The number of partitions are the same (8, which is an argument to
> >> > the
> >> > HashPartitioner). In the first case, these partitions are spread
> across
> >> > both the worker nodes. In the second case, all the partitions are on
> >> > the
> >> > same node.
> >> > 2) What resources would be of interest here? Scala shell takes the
> >> default
> >> > parameters since we use "bin/spark-shell --master <master-URL>" to run
> >> the
> >> > scala-shell. For the scala program, we do set some configuration
> >> > options
> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> >> > serializer.
> >> >
> >> > We are running this on Azure D3-v2 machines which have 4 cores and
> 14GB
> >> > RAM.1 executor runs on each worker node. Following configuration
> >> > options
> >> > are set for the scala program -- perhaps we should move it to the
> spark
> >> > config file.
> >> >
> >> > Driver memory and executor memory are set to 12GB
> >> > parallelism is set to 8
> >> > Kryo serializer is used
> >> > Number of retainedJobs and retainedStages has been increased to check
> >> them
> >> > in the UI.
> >> >
> >> > What information regarding Spark Context would be of interest here?
> >> >
> >> > Regards,
> >> > Raghava.
> >> >
> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
> >> > wrote:
> >> >
> >> >> If the data file is same then it should have similar distribution of
> >> >> keys.
> >> >> Few queries-
> >> >>
> >> >> 1. Did you compare the number of partitions in both the cases?
> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
> >> >> Program being submitted?
> >> >>
> >> >> Also, can you please share the details of Spark Context, Environment
> >> >> and
> >> >> Executors when you run via Scala program?
> >> >>
> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >> >> m.vijayaragh...@gmail.com> wrote:
> >> >>
> >> >>> Hello All,
> >> >>>
> >> >>> We are using HashPartitioner in the following way on a 3 node
> cluster
> >> (1
> >> >>> master and 2 worker nodes).
> >> >>>
> >> >>> val u =
> >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
> >> >>> (y.toInt,
> >> >>> x.toInt) } }).partitionBy(new
> >> HashPartitioner(8)).setName("u").persist()
> >> >>>
> >> >>> u.count()
> >> >>>
> >> >>> If we run this from the spark shell, the data (52 MB) is split
> across
> >> >>> the
> >> >>> two worker nodes. But if we put this in a scala program and run it,
> >> then
> >> >>> all the data goes to only one node. We have run it multiple times,
> >> >>> but
> >> >>> this
> >> >>> behavior does not change. This seems strange.
> >> >>>
> >> >>> Is there some problem with the way we use HashPartitioner?
> >> >>>
> >> >>> Thanks in advance.
> >> >>>
> >> >>> Regards,
> >> >>> Raghava.
> >> >>>
> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > Regards,
> >> > Raghava
> >> > http://raghavam.github.io
> >> >
> >>
> >>
> >> --
> >> Thanks,
> >> Mike
> >>
> >
>
>
> --
> Thanks,
> Mike
>



-- 
Regards,
Raghava
http://raghavam.github.io

Reply via email to