[
https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-19185:
------------------------------------
Assignee: Apache Spark
> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -------------------------------------------------------------------------
>
> Key: SPARK-19185
> URL: https://issues.apache.org/jira/browse/SPARK-19185
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.2
> Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Reporter: Kalvin Chau
> Assignee: Apache Spark
> Priority: Major
> Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
> working through debugging this issue and after looking through some of the
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread
> went first would seek and poll for all the records, and at the same time the
> second thread would try to seek to its offset but fail because it is unable
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Polled [test-topic-2] 8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested
> 4394204416
> ...
> {code}
> It looks like when WindowedDStream does the getOrCompute call its computing
> all the sets of of offsets it needs and tries to farm out the work in
> parallel. So each available worker task gets each set of offsets that need to
> be read.
> After realizing what was going on I tested four states:
> * spark.executor.cores 1 and spark.mesos.extra.cores 0
> ** No Exceptions
> * spark.executor.cores 1 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 0
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> Minimal set of code I was able to reproduce with:
> Streaming batch interval was set to 2 seconds. This increased the rate of
> exceptions I saw.
> {code}
> val kafkaParams = Map[String, Object](
> "bootstrap.servers" -> brokers,
> "key.deserializer" -> classOf[KafkaAvroDeserializer],
> "value.deserializer" -> classOf[KafkaAvroDeserializer],
> "enable.auto.commit" -> (false: java.lang.Boolean),
> "group.id" -> groupId,
> "schema.registry.url" -> schemaRegistryUrl,
> "auto.offset.reset" -> offset
> )
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
> ssc,
> PreferConsistent,
> Subscribe[Object, Object]
> (kafkaTopic, kafkaParams)
> )
> val windowStream = inputStream.map(_.toString).window(Seconds(180),
> Seconds(30))
> windowStream.foreachRDD{
> rdd => {
> val filtered = rdd.filter(_.contains("idb"))
> filtered.foreach(
> message => {
> var i = 0
> if (i == 0) {
> logger.info(message)
> i = i + 1
> }
> }
> )
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]