I don't see anything obvious.  If the slowness is correlated with the
errors you're seeing, I'd start looking at what's going on with kafka or
your network.

On Mon, Aug 28, 2017 at 7:06 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Hi Cody,
>
> Following is the way that I am consuming data for a 60 second batch. Do
> you see anything that is wrong with the way the data is getting consumed
> that can cause slowness in performance?
>
>
> val kafkaParams = Map[String, Object](
>       "bootstrap.servers" -> kafkaBrokers,
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "auto.offset.reset" -> "latest",
>       "heartbeat.interval.ms" -> Integer.valueOf(20000),
>       "session.timeout.ms" -> Integer.valueOf(60000),
>       "request.timeout.ms" -> Integer.valueOf(90000),
>       "enable.auto.commit" -> (false: java.lang.Boolean),
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "group.id" -> "test1"
>     )
>
>       val hubbleStream = KafkaUtils.createDirectStream[String, String](
>         ssc,
>         LocationStrategies.PreferConsistent,
>         ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
>       )
>
> val kafkaStreamRdd = kafkaStream.transform { rdd =>
> rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
> }
>
> On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being enabled.
>>
>> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> There is no difference in performance even with Cache being disabled.
>>>
>>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> So if you can run with cache enabled for some time, does that
>>>> significantly affect the performance issue you were seeing?
>>>>
>>>> Those settings seem reasonable enough.   If preferred locations is
>>>> behaving correctly you shouldn't need cached consumers for all 96
>>>> partitions on any one executor, so that maxCapacity setting is
>>>> probably unnecessary.
>>>>
>>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>>> <swethakasire...@gmail.com> wrote:
>>>> > Because I saw some posts that say that consumer cache  enabled will
>>>> have
>>>> > concurrentModification exception with reduceByKeyAndWIndow. I see
>>>> those
>>>> > errors as well after running for sometime with cache being enabled.
>>>> So, I
>>>> > had to disable it. Please see the tickets below.  We have 96
>>>> partitions. So
>>>> > if I enable cache, would teh following settings help to improve
>>>> performance?
>>>> >
>>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>>> Integer.valueOf(96),
>>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>>> Integer.valueOf(96),
>>>> >
>>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>>> >
>>>> >
>>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>>> >
>>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>>> >
>>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>> >>
>>>> >> Why are you setting consumer.cache.enabled to false?
>>>> >>
>>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>>>> wrote:
>>>> >> > Hi,
>>>> >> >
>>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>>> My
>>>> >> > job
>>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But
>>>> its
>>>> >> > very
>>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>>> Kafka 10
>>>> >> > . I
>>>> >> > see the following error sometimes . Please see the kafka
>>>> parameters and
>>>> >> > the
>>>> >> > consumer strategy for creating the stream below. Any suggestions
>>>> on how
>>>> >> > to
>>>> >> > run this with better performance would be of great help.
>>>> >> >
>>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>>> for
>>>> >> > test
>>>> >> > stream1 72 324027964 after polling for 120000
>>>> >> >
>>>> >> > val kafkaParams = Map[String, Object](
>>>> >> >       "bootstrap.servers" -> kafkaBrokers,
>>>> >> >       "key.deserializer" -> classOf[StringDeserializer],
>>>> >> >       "value.deserializer" -> classOf[StringDeserializer],
>>>> >> >       "auto.offset.reset" -> "latest",
>>>> >> >       "heartbeat.interval.ms" -> Integer.valueOf(20000),
>>>> >> >       "session.timeout.ms" -> Integer.valueOf(60000),
>>>> >> >       "request.timeout.ms" -> Integer.valueOf(90000),
>>>> >> >       "enable.auto.commit" -> (false: java.lang.Boolean),
>>>> >> >       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>>>> >> >       "group.id" -> "test1"
>>>> >> >     )
>>>> >> >
>>>> >> >       val hubbleStream = KafkaUtils.createDirectStream[String,
>>>> String](
>>>> >> >         ssc,
>>>> >> >         LocationStrategies.PreferConsistent,
>>>> >> >         ConsumerStrategies.Subscribe[String, String](topicsSet,
>>>> >> > kafkaParams)
>>>> >> >       )
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > --
>>>> >> > View this message in context:
>>>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>>>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-K
>>>> afka-10-cluster-tp29108.html
>>>> >> > Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >> >
>>>> >> > ------------------------------------------------------------
>>>> ---------
>>>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> >> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to