[ 
https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16043030#comment-16043030
 ] 

Marcelo Vanzin commented on SPARK-19185:
----------------------------------------

I merged Mark's patch above to master and branch-2.2, but it's just a 
work-around, not a fix, so I'll leave the bug open (and with no "fix version").

> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -------------------------------------------------------------------------
>
>                 Key: SPARK-19185
>                 URL: https://issues.apache.org/jira/browse/SPARK-19185
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>            Reporter: Kalvin Chau
>              Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is 
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been 
> working through debugging this issue and after looking through some of the 
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using 
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker 
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread 
> went first would seek and poll for all the records, and at the same time the 
> second thread would try to seek to its offset but fail because it is unable 
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: 
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting 
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block 
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in 
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>       at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>       at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>       at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>       at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
>       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>       at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>       at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
>       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested 
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested 
> 4394204416
> ... 
> {code}
> It looks like when WindowedDStream does the getOrCompute call its computing 
> all the sets of of offsets it needs and tries to farm out the work in 
> parallel. So each available worker task gets each set of offsets that need to 
> be read.
> After realizing what was going on I tested four states:
> * spark.executor.cores 1 and spark.mesos.extra.cores 0
> ** No Exceptions
> * spark.executor.cores 1 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 0
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> Minimal set of code I was able to reproduce with:
> Streaming batch interval was set to 2 seconds. This increased the rate of 
> exceptions I saw.
> {code}
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
> val windowStream = inputStream.map(_.toString).window(Seconds(180), 
> Seconds(30))
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to