Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
connector requires it must be 1.

On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <kalvinnc...@gmail.com> wrote:

> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <kalvinnc...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer
> is not safe for multi-threaded access" with the CachedKafkaConsumer. I've
> been working through debugging this issue and after looking through some of
> the spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.seek(
> KafkaConsumer.java:1132)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> seek(CachedKafkaConsumer.scala:95)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:69)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(
> MemoryStore.scala:360)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:951)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at org.apache.spark.storage.BlockManager.doPutIterator(
> BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
>
> It looks like when WindowedDStream does the getOrCompute call its
> computing all the sets of of offsets it needs and tries to farm out the
> work in parallel. So each available worker task gets each set of offsets
> that need to be read.
>
> After realizing what was going on I tested four states:
>
>    - spark.executor.cores 1 and spark.mesos.extra.cores 0
>       - No Exceptions
>    - spark.executor.cores 1 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 0
>       - ConcurrentModificationException
>    - spark.executor.cores 2 and spark.mesos.extra.cores 1
>       - ConcurrentModificationException
>
>
> I'm not sure what the best solution to this is if we want to be able to
> have N tasks threads read from the same TopicPartition to increase
> parallelization. You could possibly allow N CachedKafkaConsumers for the
> same TopicPartition.
>
> Any thoughts on this?
>
> Thanks,
> Kalvin
>
>
>

Reply via email to