
Kalvin Chau commented on SPARK-19185:

[~c...@koeninger.org]  [~uncleGen]

Thanks, I haven't had the opportunity to test either of those options yet. 
* But unless the persisting somehow prevents two Task threads within the 
executor from trying to read from the same partition, then I don't see how that 
would prevent the underlying issue. But I'm not 100% sure how that affects the 
scheduling of tasks. 
* I'll keep this in mind as well, seems straight forward.

Our current workaround is just making sure we have 1 task thread per executor, 
which is a performance hit especially for some of our higher velocity streams. 
To get around that we've increased the number of partitions on the topic, and 
upped the number of executors to match.

I think having consumer cache be configurable is a good idea, can't we 
technically already do that by setting the size of the cache to 0? (using 
"spark.sql.kafkaConsumerCache.capacity") Or will that fail? I haven't had a 
chance to test that either.
I think having a pool of consumers with N group ids for a topicPartition (N 
being the number task threads within an executor) seems like good idea.

If there were a solid direction going forward wouldn't mind tackling this issue 
either to contribute back!

> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -------------------------------------------------------------------------
>                 Key: SPARK-19185
>                 URL: https://issues.apache.org/jira/browse/SPARK-19185
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>            Reporter: Kalvin Chau
>              Labels: streaming, windowing
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is 
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been 
> working through debugging this issue and after looking through some of the 
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using 
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker 
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread 
> went first would seek and poll for all the records, and at the same time the 
> second thread would try to seek to its offset but fail because it is unable 
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: 
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting 
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block 
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in 
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>       at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>       at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>       at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>       at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
>       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
>       at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>       at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
>       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
>       at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Polled [test-topic-2]  8237
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204415 requested 
> 4394204415
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204416 requested 
> 4394204416
> ... 
> {code}
> It looks like when WindowedDStream does the getOrCompute call its computing 
> all the sets of of offsets it needs and tries to farm out the work in 
> parallel. So each available worker task gets each set of offsets that need to 
> be read.
> After realizing what was going on I tested four states:
> * spark.executor.cores 1 and spark.mesos.extra.cores 0
> ** No Exceptions
> * spark.executor.cores 1 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 0
> ** ConcurrentModificationException
> * spark.executor.cores 2 and spark.mesos.extra.cores 1
> ** ConcurrentModificationException
> Minimal set of code I was able to reproduce with:
> Streaming batch interval was set to 2 seconds. This increased the rate of 
> exceptions I saw.
> {code}
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
> val windowStream = inputStream.map(_.toString).window(Seconds(180), 
> Seconds(30))
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
> {code}

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to