Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Shahbaz
   - Do you happen to see how busy are the nodes in terms of CPU and how
   much heap each executor is allocated with.
   - If there is enough capacity ,you may want to increase number of cores
   per executor to 2 and do the needed heap tweaking.
   - How much time did it take to process 4M+ events (In Spark UI,you can
   look at Duration) column.
   - I believe Reader is Quite fast ,however Processing could be slower ,if
   you click on the Job,it gives you break down of execution,Result
   Serialization etc ,you may want to look at that and drive from there.


Regards,
Shahbaz

On Sun, Mar 6, 2016 at 9:26 PM, Vinti Maheshwari 
wrote:

> I have 2 machines in my cluster with the below specifications:
> 128 GB RAM and 8 cores machine
>
> Regards,
> ~Vinti
>
> On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari 
> wrote:
>
>> Thanks Supreeth and Shahbaz. I will try adding
>> spark.streaming.kafka.maxRatePerPartition.
>>
>> Hi Shahbaz,
>>
>> Please see comments, inline:
>>
>>
>>- Which version of Spark you are using. ==> *1.5.2*
>>- How big is the Kafka Cluster ==> *2 brokers*
>>- What is the Message Size and type.==>
>> *String, 9,550 bytes (around) *
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>>- What does your Spark Job looks like ==>
>>
>>
>>val messages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>>
>>
>>  val parsedStream = inputStream
>>.map(line => {
>>  val splitLines = line.split(",")
>>  (splitLines(1), splitLines.slice(2, 
>> splitLines.length).map((_.trim.toLong)))
>>})
>>
>>  val state: DStream[(String, Array[Long])] = 
>> parsedStream.updateStateByKey(
>>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>  prev.map(_ +: current).orElse(Some(current))
>>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
>> _).toArray).toOption)
>>})
>>  state.checkpoint(Duration(25000))
>>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>>  ssc
>>}
>>
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>  ==>
>>
>>
>> *yes, i had enabled it.*
>> Regards,
>> ~Vinti
>>
>> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:
>>
>>> Hello,
>>>
>>>- Which version of Spark you are using.
>>>- How big is the Kafka Cluster
>>>- What is the Message Size and type.
>>>- How big is the spark cluster (How many executors ,How many cores
>>>Per Executor)
>>>- What does your Spark Job looks like .
>>>
>>> spark.streaming.backpressure.enabled set it to true and try?
>>>
>>>
>>> Regards,
>>> Shahbaz
>>> +91-9986850670
>>>
>>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth 
>>> wrote:
>>>
 Try setting spark.streaming.kafka.maxRatePerPartition, this can help
 control the number of messages read from Kafka per partition on the spark
 streaming consumer.

 -S


 On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
 wrote:

 Hello,

 I am trying to figure out why my kafka+spark job is running slow. I
 found that spark is consuming all the messages out of kafka into a single
 batch itself and not sending any messages to the other batches.

 2016/03/05 21:57:05
 
 0 events - - queued 2016/03/05 21:57:00
 
 0 events - - queued 2016/03/05 21:56:55
 
 0 events - - queued 2016/03/05 21:56:50
 
 0 events - - queued 2016/03/05 21:56:45
 
 0 events - - queued 2016/03/05 21:56:40
 
 4039573 events 6 ms - processing

 Does anyone know how this behavior can be changed so that the number of
 messages are load balanced across all the batches?

 Thanks,
 Vinti


>>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
I have 2 machines in my cluster with the below specifications:
128 GB RAM and 8 cores machine

Regards,
~Vinti

On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari 
wrote:

> Thanks Supreeth and Shahbaz. I will try adding
> spark.streaming.kafka.maxRatePerPartition.
>
> Hi Shahbaz,
>
> Please see comments, inline:
>
>
>- Which version of Spark you are using. ==> *1.5.2*
>- How big is the Kafka Cluster ==> *2 brokers*
>- What is the Message Size and type.==>
> *String, 9,550 bytes (around) *
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>- What does your Spark Job looks like ==>
>
>
>val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>
>
>  val parsedStream = inputStream
>.map(line => {
>  val splitLines = line.split(",")
>  (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>})
>
>  val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>  prev.map(_ +: current).orElse(Some(current))
>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
> _).toArray).toOption)
>})
>  state.checkpoint(Duration(25000))
>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>  ssc
>}
>
>
> spark.streaming.backpressure.enabled set it to true and try?
>  ==>
>
>
> *yes, i had enabled it.*
> Regards,
> ~Vinti
>
> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:
>
>> Hello,
>>
>>- Which version of Spark you are using.
>>- How big is the Kafka Cluster
>>- What is the Message Size and type.
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)
>>- What does your Spark Job looks like .
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>
>>
>> Regards,
>> Shahbaz
>> +91-9986850670
>>
>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth  wrote:
>>
>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>> control the number of messages read from Kafka per partition on the spark
>>> streaming consumer.
>>>
>>> -S
>>>
>>>
>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to figure out why my kafka+spark job is running slow. I
>>> found that spark is consuming all the messages out of kafka into a single
>>> batch itself and not sending any messages to the other batches.
>>>
>>> 2016/03/05 21:57:05
>>> 
>>> 0 events - - queued 2016/03/05 21:57:00
>>> 
>>> 0 events - - queued 2016/03/05 21:56:55
>>> 
>>> 0 events - - queued 2016/03/05 21:56:50
>>> 
>>> 0 events - - queued 2016/03/05 21:56:45
>>> 
>>> 0 events - - queued 2016/03/05 21:56:40
>>> 
>>> 4039573 events 6 ms - processing
>>>
>>> Does anyone know how this behavior can be changed so that the number of
>>> messages are load balanced across all the batches?
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
Thanks Supreeth and Shahbaz. I will try adding
spark.streaming.kafka.maxRatePerPartition.

Hi Shahbaz,

Please see comments, inline:


   - Which version of Spark you are using. ==> *1.5.2*
   - How big is the Kafka Cluster ==> *2 brokers*
   - What is the Message Size and type.==>
*String, 9,550 bytes (around) *
   - How big is the spark cluster (How many executors ,How many cores Per
   Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
   - What does your Spark Job looks like ==>


   val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
 ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)


 val parsedStream = inputStream
   .map(line => {
 val splitLines = line.split(",")
 (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
   })

 val state: DStream[(String, Array[Long])] =
parsedStream.updateStateByKey(
   (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 prev.map(_ +: current).orElse(Some(current))
   .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
_).toArray).toOption)
   })
 state.checkpoint(Duration(25000))
 state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
 ssc
   }


spark.streaming.backpressure.enabled set it to true and try?
 ==>


*yes, i had enabled it.*
Regards,
~Vinti

On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:

> Hello,
>
>- Which version of Spark you are using.
>- How big is the Kafka Cluster
>- What is the Message Size and type.
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)
>- What does your Spark Job looks like .
>
> spark.streaming.backpressure.enabled set it to true and try?
>
>
> Regards,
> Shahbaz
> +91-9986850670
>
> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth  wrote:
>
>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>> control the number of messages read from Kafka per partition on the spark
>> streaming consumer.
>>
>> -S
>>
>>
>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
>> wrote:
>>
>> Hello,
>>
>> I am trying to figure out why my kafka+spark job is running slow. I found
>> that spark is consuming all the messages out of kafka into a single batch
>> itself and not sending any messages to the other batches.
>>
>> 2016/03/05 21:57:05
>> 
>> 0 events - - queued 2016/03/05 21:57:00
>> 
>> 0 events - - queued 2016/03/05 21:56:55
>> 
>> 0 events - - queued 2016/03/05 21:56:50
>> 
>> 0 events - - queued 2016/03/05 21:56:45
>> 
>> 0 events - - queued 2016/03/05 21:56:40
>> 
>> 4039573 events 6 ms - processing
>>
>> Does anyone know how this behavior can be changed so that the number of
>> messages are load balanced across all the batches?
>>
>> Thanks,
>> Vinti
>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-05 Thread Supreeth
Try setting spark.streaming.kafka.maxRatePerPartition, this can help control 
the number of messages read from Kafka per partition on the spark streaming 
consumer.

-S


> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari  wrote:
> 
> Hello,
> 
> I am trying to figure out why my kafka+spark job is running slow. I found 
> that spark is consuming all the messages out of kafka into a single batch 
> itself and not sending any messages to the other batches.
> 
> 2016/03/05 21:57:05 0 events - - queued 2016/03/05 21:57:00 0 events - - 
> queued 2016/03/05 21:56:55 0 events - - queued 2016/03/05 21:56:50 0 events - 
> - queued 2016/03/05 21:56:45 0 events - - queued 2016/03/05 21:56:40 4039573 
> events 6 ms - processing
> 
> Does anyone know how this behavior can be changed so that the number of 
> messages are load balanced across all the batches?
> 
> Thanks,
> Vinti


Spark + Kafka all messages being used in 1 batch

2016-03-05 Thread Vinti Maheshwari
Hello,

I am trying to figure out why my kafka+spark job is running slow. I found
that spark is consuming all the messages out of kafka into a single batch
itself and not sending any messages to the other batches.

2016/03/05 21:57:05

0 events - - queued 2016/03/05 21:57:00

0 events - - queued 2016/03/05 21:56:55

0 events - - queued 2016/03/05 21:56:50

0 events - - queued 2016/03/05 21:56:45

0 events - - queued 2016/03/05 21:56:40

4039573 events 6 ms - processing

Does anyone know how this behavior can be changed so that the number of
messages are load balanced across all the batches?

Thanks,
Vinti