I've filed an issue here https://issues.apache.org/jira/browse/SPARK-19185,
let me know if I missed anything!

--Kalvin

On Wed, Jan 11, 2017 at 5:43 PM Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> Thanks for reporting this. Finally I understood the root cause. Could you
> file a JIRA on https://issues.apache.org/jira/browse/SPARK please?
>
> On Wed, Jan 11, 2017 at 5:20 PM, Kalvin Chau <kalvinnc...@gmail.com>
> wrote:
>
> Here is the minimal code example where I was able to replicate:
> Batch interval is set to 2 to get the exceptions to happen more often.
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[KafkaAvroDeserializer],
>   "value.deserializer" -> classOf[KafkaAvroDeserializer],
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "group.id" -> groupId,
>   "schema.registry.url" -> schemaRegistryUrl,
>   "auto.offset.reset" -> offset
> )
>
> val inputStream = KafkaUtils.createDirectStream[Object, Object](
>   ssc,
>   PreferConsistent,
>   Subscribe[Object, Object]
>     (kafkaTopic, kafkaParams)
> )
>
> val windowStream = inputStream.map(_.toString).window(Seconds(180), 
> Seconds(30))
>
> windowStream.foreachRDD{
>   rdd => {
>     val filtered = rdd.filter(_.contains("idb"))
>
>     filtered.foreach(
>       message => {
>         var i = 0
>         if (i == 0) {
>           logger.info(message)
>           i = i + 1
>         }
>       }
>     )
>   }
> }
>
>
> On Wed, Jan 11, 2017 at 4:04 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> Could you post your codes, please?
>
> On Wed, Jan 11, 2017 at 3:53 PM, Kalvin Chau <kalvinnc...@gmail.com>
> wrote:
>
> "spark.speculation" is not set, so it would be whatever the default is.
>
>
> On Wed, Jan 11, 2017 at 3:43 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> Or do you enable "spark.speculation"? If not, Spark Streaming should not
> launch two tasks using the same TopicPartition.
>
> On Wed, Jan 11, 2017 at 3:33 PM, Kalvin Chau <kalvinnc...@gmail.com>
> wrote:
>
> I have not modified that configuration setting, and that doesn't seem to
> be documented anywhere.
>
> Does the Kafka 0.10 require the number of cores on an executor be set to
> 1? I didn't see that documented anywhere either.
>
> On Wed, Jan 11, 2017 at 3:27 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> Do you change "spark.streaming.concurrentJobs" to more than 1? Kafka 0.10
> connector requires it must be 1.
>
> On Wed, Jan 11, 2017 at 3:25 PM, Kalvin Chau <kalvinnc...@gmail.com>
> wrote:
>
> I'm not re-using any InputDStreams actually, this is one InputDStream that
> has a window applied to it.
>  Then when Spark creates and assigns tasks to read from the Topic, one
> executor gets assigned two tasks to read from the same TopicPartition, and
> uses the same CachedKafkaConsumer to read from the TopicPartition causing
> the ConcurrentModificationException in one of the worker threads.
>
> On Wed, Jan 11, 2017 at 2:53 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> I think you may reuse the kafka DStream (the DStream returned by
> createDirectStream). If you need to read from the same Kafka source, you
> need to create another DStream.
>
> On Wed, Jan 11, 2017 at 2:38 PM, Kalvin Chau <kalvinnc...@gmail.com>
> wrote:
>
> Hi,
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been
> working through debugging this issue and after looking through some of the
> spark source code I think this is a bug.
>
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
>
> We would see the exception when in one executor there are two task worker
> threads assigned the same Topic+Partition, but a different set of offsets.
>
> They would both get the same CachedKafkaConsumer, and whichever task
> thread went first would seek and poll for all the records, and at the same
> time the second thread would try to seek to its offset but fail because it
> is unable to acquire the lock.
>
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
>
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
>
> Here are some relevant logs:
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer:
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer:
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception
> in task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
>
>

Reply via email to