Thats is kind of expected due to data locality. Though you should see
some tasks running on the executors as the data gets replicated to
other nodes and can therefore run tasks based on locality. You have
two solutions

1. kafkaStream.repartition() to explicitly repartition the received
data across the cluster.
2. Create multiple kafka streams and union them together.

See 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha <me.mukesh....@gmail.com> wrote:
> Thanks Sandy, It was the issue with the no of cores.
>
> Another issue I was facing is that tasks are not getting distributed evenly
> among all executors and are running on the NODE_LOCAL locality level i.e.
> all the tasks are running on the same executor where my kafkareceiver(s) are
> running even though other executors are idle.
>
> I configured spark.locality.wait=50 instead of the default 3000 ms, which
> forced the task rebalancing among nodes, let me know if there is a better
> way to deal with this.
>
>
> On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha <me.mukesh....@gmail.com>
> wrote:
>>
>> Makes sense, I've also tries it in standalone mode where all 3 workers &
>> driver were running on the same 8 core box and the results were similar.
>>
>> Anyways I will share the results in YARN mode with 8 core yarn containers.
>>
>> On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza <sandy.r...@cloudera.com>
>> wrote:
>>>
>>> When running in standalone mode, each executor will be able to use all 8
>>> cores on the box.  When running on YARN, each executor will only have access
>>> to 2 cores.  So the comparison doesn't seem fair, no?
>>>
>>> -Sandy
>>>
>>> On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha <me.mukesh....@gmail.com>
>>> wrote:
>>>>
>>>> Nope, I am setting 5 executors with 2  cores each. Below is the command
>>>> that I'm using to submit in YARN mode. This starts up 5 executor nodes and 
>>>> a
>>>> drives as per the spark  application master UI.
>>>>
>>>> spark-submit --master yarn-cluster --num-executors 5 --driver-memory
>>>> 1024m --executor-memory 1024m --executor-cores 2 --class
>>>> com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka
>>>> spark-yarn avro 1 5000
>>>>
>>>> On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza <sandy.r...@cloudera.com>
>>>> wrote:
>>>>>
>>>>> *oops, I mean are you setting --executor-cores to 8
>>>>>
>>>>> On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza <sandy.r...@cloudera.com>
>>>>> wrote:
>>>>>>
>>>>>> Are you setting --num-executors to 8?
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha <me.mukesh....@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Sorry Sandy, The command is just for reference but I can confirm that
>>>>>>> there are 4 executors and a driver as shown in the spark UI page.
>>>>>>>
>>>>>>> Each of these machines is a 8 core box with ~15G of ram.
>>>>>>>
>>>>>>> On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
>>>>>>> <sandy.r...@cloudera.com> wrote:
>>>>>>>>
>>>>>>>> Hi Mukesh,
>>>>>>>>
>>>>>>>> Based on your spark-submit command, it looks like you're only
>>>>>>>> running with 2 executors on YARN.  Also, how many cores does each 
>>>>>>>> machine
>>>>>>>> have?
>>>>>>>>
>>>>>>>> -Sandy
>>>>>>>>
>>>>>>>> On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
>>>>>>>> <me.mukesh....@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hello Experts,
>>>>>>>>> I'm bench-marking Spark on YARN
>>>>>>>>> (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a 
>>>>>>>>> standalone
>>>>>>>>> spark cluster 
>>>>>>>>> (https://spark.apache.org/docs/latest/spark-standalone.html).
>>>>>>>>> I have a standalone cluster with 3 executors, and a spark app
>>>>>>>>> running on yarn with 4 executors as shown below.
>>>>>>>>>
>>>>>>>>> The spark job running inside yarn is 10x slower than the one
>>>>>>>>> running on the standalone cluster (even though the yarn has more 
>>>>>>>>> number of
>>>>>>>>> workers), also in both the case all the executors are in the same 
>>>>>>>>> datacenter
>>>>>>>>> so there shouldn't be any latency. On YARN each 5sec batch is reading 
>>>>>>>>> data
>>>>>>>>> from kafka and processing it in 5sec & on the standalone cluster each 
>>>>>>>>> 5sec
>>>>>>>>> batch is getting processed in 0.4sec.
>>>>>>>>> Also, In YARN mode all the executors are not getting used up evenly
>>>>>>>>> as vm-13 & vm-14 are running most of the tasks whereas in the 
>>>>>>>>> standalone
>>>>>>>>> mode all the executors are running the tasks.
>>>>>>>>>
>>>>>>>>> Do I need to set up some configuration to evenly distribute the
>>>>>>>>> tasks? Also do you have any pointers on the reasons the yarn job is 
>>>>>>>>> 10x
>>>>>>>>> slower than the standalone job?
>>>>>>>>> Any suggestion is greatly appreciated, Thanks in advance.
>>>>>>>>>
>>>>>>>>> YARN(5 workers + driver)
>>>>>>>>> ========================
>>>>>>>>> Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input
>>>>>>>>> ShuffleRead ShuffleWrite Thread Dump
>>>>>>>>> 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B
>>>>>>>>> 2047.0 B 1710.0 B Thread Dump
>>>>>>>>> 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m
>>>>>>>>> 0.0 B 0.0 B 0.0 B Thread Dump
>>>>>>>>> 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m
>>>>>>>>> 0.0 B 1368.0 B 2.8 KB Thread Dump
>>>>>>>>> 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B
>>>>>>>>> 1368.0 B 1026.0 B Thread Dump
>>>>>>>>> 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B
>>>>>>>>> 1881.0 B 2.8 KB Thread Dump
>>>>>>>>> <driver> vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B
>>>>>>>>> 0.0 B 0.0 B Thread Dump
>>>>>>>>>
>>>>>>>>> /homext/spark/bin/spark-submit
>>>>>>>>> --master yarn-cluster --num-executors 2 --driver-memory 512m
>>>>>>>>> --executor-memory 512m --executor-cores 2
>>>>>>>>> --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
>>>>>>>>> vm.cloud.com:2181/kafka spark-yarn avro 1 5000
>>>>>>>>>
>>>>>>>>> STANDALONE(3 workers + driver)
>>>>>>>>> ==============================
>>>>>>>>> Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input
>>>>>>>>> ShuffleRead ShuffleWrite Thread Dump
>>>>>>>>> 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m
>>>>>>>>> 0.0 B 1534.0 B 3.0 KB Thread Dump
>>>>>>>>> 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m
>>>>>>>>> 0.0 B 1368.0 B 4.0 KB Thread Dump
>>>>>>>>> 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m
>>>>>>>>> 0.0 B 2.0 KB 1368.0 B Thread Dump
>>>>>>>>> <driver> vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B
>>>>>>>>> 0.0 B 0.0 B Thread Dump
>>>>>>>>>
>>>>>>>>> /homext/spark/bin/spark-submit
>>>>>>>>> --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077
>>>>>>>>> --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
>>>>>>>>> vm.cloud.com:2181/kafka spark-standalone avro 1 5000
>>>>>>>>>
>>>>>>>>> PS: I did go through the spark website and
>>>>>>>>> http://www.virdata.com/tuning-spark/, but was out of any luck.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cheers,
>>>>>>>>> Mukesh Jha
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>> Thanks & Regards,
>>>>>>>
>>>>>>> Mukesh Jha
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>>
>>>> Thanks & Regards,
>>>>
>>>> Mukesh Jha
>>>
>>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> Mukesh Jha
>
>
>
>
> --
>
>
> Thanks & Regards,
>
> Mukesh Jha

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to