- Do you happen to see how busy are the nodes in terms of CPU and how
   much heap each executor is allocated with.
   - If there is enough capacity ,you may want to increase number of cores
   per executor to 2 and do the needed heap tweaking.
   - How much time did it take to process 4M+ events (In Spark UI,you can
   look at Duration) column.
   - I believe Reader is Quite fast ,however Processing could be slower ,if
   you click on the Job,it gives you break down of execution,Result
   Serialization etc ,you may want to look at that and drive from there.


Regards,
Shahbaz

On Sun, Mar 6, 2016 at 9:26 PM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> I have 2 machines in my cluster with the below specifications:
> 128 GB RAM and 8 cores machine
>
> Regards,
> ~Vinti
>
> On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Thanks Supreeth and Shahbaz. I will try adding
>> spark.streaming.kafka.maxRatePerPartition.
>>
>> Hi Shahbaz,
>>
>> Please see comments, inline:
>>
>>
>>    - Which version of Spark you are using. ==> *1.5.2*
>>    - How big is the Kafka Cluster ==> *2 brokers*
>>    - What is the Message Size and type.==>
>> *String, 9,550 bytes (around) *
>>    - How big is the spark cluster (How many executors ,How many cores
>>    Per Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>>    - What does your Spark Job looks like ==>
>>
>>
>>    val messages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>      ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>>
>>
>>          val parsedStream = inputStream
>>            .map(line => {
>>              val splitLines = line.split(",")
>>              (splitLines(1), splitLines.slice(2, 
>> splitLines.length).map((_.trim.toLong)))
>>            })
>>
>>          val state: DStream[(String, Array[Long])] = 
>> parsedStream.updateStateByKey(
>>            (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>              prev.map(_ +: current).orElse(Some(current))
>>                .flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
>> _).toArray).toOption)
>>            })
>>          state.checkpoint(Duration(25000))
>>          state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>>          ssc
>>        }
>>
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>  ==>
>>
>>
>> *yes, i had enabled it.*
>> Regards,
>> ~Vinti
>>
>> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>>    - Which version of Spark you are using.
>>>    - How big is the Kafka Cluster
>>>    - What is the Message Size and type.
>>>    - How big is the spark cluster (How many executors ,How many cores
>>>    Per Executor)
>>>    - What does your Spark Job looks like .
>>>
>>> spark.streaming.backpressure.enabled set it to true and try?
>>>
>>>
>>> Regards,
>>> Shahbaz
>>> +91-9986850670
>>>
>>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth....@gmail.com>
>>> wrote:
>>>
>>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>>> control the number of messages read from Kafka per partition on the spark
>>>> streaming consumer.
>>>>
>>>> -S
>>>>
>>>>
>>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am trying to figure out why my kafka+spark job is running slow. I
>>>> found that spark is consuming all the messages out of kafka into a single
>>>> batch itself and not sending any messages to the other batches.
>>>>
>>>> 2016/03/05 21:57:05
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000>
>>>> 0 events - - queued 2016/03/05 21:57:00
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243820000>
>>>> 0 events - - queued 2016/03/05 21:56:55
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000>
>>>> 0 events - - queued 2016/03/05 21:56:50
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243810000>
>>>> 0 events - - queued 2016/03/05 21:56:45
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000>
>>>> 0 events - - queued 2016/03/05 21:56:40
>>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243800000>
>>>> 4039573 events 6 ms - processing
>>>>
>>>> Does anyone know how this behavior can be changed so that the number of
>>>> messages are load balanced across all the batches?
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>>
>>>
>>
>

Reply via email to