I've filed an issue here https://issues.apache.org/jira/browse/SPARK-19185, let me know if I missed anything!
--Kalvin On Wed, Jan 11, 2017 at 5:43 PM Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > Thanks for reporting this. Finally I understood the root cause. Could you > file a JIRA on https://issues.apache.org/jira/browse/SPARK please? > > On Wed, Jan 11, 2017 at 5:20 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > Here is the minimal code example where I was able to replicate: > Batch interval is set to 2 to get the exceptions to happen more often. > > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> brokers, > "key.deserializer" -> classOf[KafkaAvroDeserializer], > "value.deserializer" -> classOf[KafkaAvroDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean), > "group.id" -> groupId, > "schema.registry.url" -> schemaRegistryUrl, > "auto.offset.reset" -> offset > ) > > val inputStream = KafkaUtils.createDirectStream[Object, Object]( > ssc, > PreferConsistent, > Subscribe[Object, Object] > (kafkaTopic, kafkaParams) > ) > > val windowStream = inputStream.map(_.toString).window(Seconds(180), > Seconds(30)) > > windowStream.foreachRDD{ > rdd => { > val filtered = rdd.filter(_.contains("idb")) > > filtered.foreach( > message => { > var i = 0 > if (i == 0) { > logger.info(message) > i = i + 1 > } > } > ) > } > } > > > On Wed, Jan 11, 2017 at 4:04 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > Could you post your codes, please? > > On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > "spark.speculation" is not set, so it would be whatever the default is. > > > On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > Or do you enable "spark.speculation"? If not, Spark Streaming should not > launch two tasks using the same TopicPartition. > > On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > I have not modified that configuration setting, and that doesn't seem to > be documented anywhere. > > Does the Kafka 0.10 require the number of cores on an executor be set to > 1? I didn't see that documented anywhere either. > > On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10 > connector requires it must be 1. > > On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > I'm not re-using any InputDStreams actually, this is one InputDStream that > has a window applied to it. > Then when Spark creates and assigns tasks to read from the Topic, one > executor gets assigned two tasks to read from the same TopicPartition, and > uses the same CachedKafkaConsumer to read from the TopicPartition causing > the ConcurrentModificationException in one of the worker threads. > > On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > I think you may reuse the kafka DStream (the DStream returned by > createDirectStream). If you need to read from the same Kafka source, you > need to create another DStream. > > On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <kalvinnc...@gmail.com> > wrote: > > Hi, > > We've been running into ConcurrentModificationExcpetions "KafkaConsumer is > not safe for multi-threaded access" with the CachedKafkaConsumer. I've been > working through debugging this issue and after looking through some of the > spark source code I think this is a bug. > > Our set up is: > Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using > Spark-Streaming-Kafka-010 > spark.executor.cores 1 > spark.mesos.extra.cores 1 > > Batch interval: 10s, window interval: 180s, and slide interval: 30s > > We would see the exception when in one executor there are two task worker > threads assigned the same Topic+Partition, but a different set of offsets. > > They would both get the same CachedKafkaConsumer, and whichever task > thread went first would seek and poll for all the records, and at the same > time the second thread would try to seek to its offset but fail because it > is unable to acquire the lock. > > Time0 E0 Task0 - TopicPartition("abc", 0) X to Y > Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z > > Time1 E0 Task0 - Seeks and starts to poll > Time1 E0 Task1 - Attempts to seek, but fails > > Here are some relevant logs: > 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394204414 -> 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing > topic test-topic, partition 2 offsets 4394238058 -> 4394257712 > 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394204414 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested > 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: > Initial fetch for spark-executor-consumer test-topic 2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: > Seeking to test-topic-2 4394238058 > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting > block rdd_199_2 failed due to an exception > 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block > rdd_199_2 could not be removed as it was not found on disk or in memory > 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception > in task 49.0 in stage 45.0 (TID 3201) > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) > >