[ https://issues.apache.org/jira/browse/SPARK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Deepak updated SPARK-23636: --------------------------- Description: While using the KafkaUtils.createRDD API - we receive below listed error, especially when 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to use a connection each for every core available in the spark executor - to fetch and process its own set of messages (offset ranges) from kafka. This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](hiveContext.sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map Unknown macro: \{ x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) } ; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 was: While using the KafkaUtils.createRDD API - we receive below listed error, especially when a 1 executor connects to 1 kafka topic-partition, but with more than 1 core & fetches an Array(OffsetRanges) h2. Error Faced {noformat} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in stage 1.0 (TID 17, host, executor 16): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} h2. Config Used to simulate the error A session with : * Executors - 1 * Cores - 2 or More * Kafka Topic - has only 1 partition * While fetching - More than one Array of Offset Range , Example {noformat} Array(OffsetRange("kafka_topic",0,608954201,608954202), OffsetRange("kafka_topic",0,608954202,608954203) ){noformat} h2. Why are we fetching from kafka as mentioned above. This gives us the capability to use a connection each for every core available in the spark executor - to fetch and process its own set of messages (offset ranges) from kafka. This was working in spark 1.6.2 However, from spark 2.1 onwards - the pattern throws exception. h2. Sample Code {quote}scala // This forces two connections to same broker for the partition specified below. val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching sample 2 records OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records ); val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = createRDD[String, String](hiveContext.sparkContext , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); val data: RDD[Row] = rDDConsumerRec.map { x => Row(x.topic().toString, x.partition().toString, x.offset().toString, x.timestamp().toString, x.value() ) }; val df = sqlContext.createDataFrame(data, StructType( Seq( StructField("topic", StringType), StructField("partition", StringType), StructField("offset", StringType), StructField("timestamp", StringType), StructField("value", BinaryType) ))); df.cache; df.registerTempTable("kafka_topic"); hiveContext.sql(""" select * from kafka_topic """).show {quote} h2. Related Issue A similar issue reported for DirectStream is https://issues.apache.org/jira/browse/SPARK-19185 > [SPARK 2.2] | Kafka Consumer | KafkaUtils.createRDD throws Exception - > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > --------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-23636 > URL: https://issues.apache.org/jira/browse/SPARK-23636 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.1, 2.2.0 > Reporter: Deepak > Priority: Major > Labels: performance > > While using the KafkaUtils.createRDD API - we receive below listed error, > especially when 1 executor connects to 1 kafka topic-partition, but with more > than 1 core & fetches an Array(OffsetRanges) > > h2. Error Faced > > {noformat} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 in > stage 1.0 (TID 17, host, executor 16): > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1629) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1528) > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1508) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.close(CachedKafkaConsumer.scala:59) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.remove(CachedKafkaConsumer.scala:185) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:204) > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:181) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323){noformat} > > h2. Config Used to simulate the error > A session with : > * Executors - 1 > * Cores - 2 or More > * Kafka Topic - has only 1 partition > * While fetching - More than one Array of Offset Range , Example > {noformat} > Array(OffsetRange("kafka_topic",0,608954201,608954202), > OffsetRange("kafka_topic",0,608954202,608954203) > ){noformat} > > h2. Why are we fetching from kafka as mentioned above. > > This gives us the capability to use a connection each for every core > available in the spark executor - to fetch and process its own set of > messages (offset ranges) from kafka. > This was working in spark 1.6.2 > However, from spark 2.1 onwards - the pattern throws exception. > h2. Sample Code > > {quote}scala > // This forces two connections to same broker for the partition specified > below. > val parallelizedRanges = Array(OffsetRange("kafka_topic",0,1,2), // Fetching > sample 2 records > OffsetRange("kafka_topic",0,2,3) // Fetching sample 2 records > ); > val kafkaParams1: java.util.Map[String, Object] = new java.util.HashMap() > val rDDConsumerRec: RDD[ConsumerRecord[String, String]] = > createRDD[String, String](hiveContext.sparkContext > , kafkaParams1, parallelizedRanges, LocationStrategies.PreferConsistent); > val data: RDD[Row] = rDDConsumerRec.map > Unknown macro: \{ x => Row(x.topic().toString, x.partition().toString, > x.offset().toString, x.timestamp().toString, x.value() ) } > ; > val df = sqlContext.createDataFrame(data, StructType( > Seq( > StructField("topic", StringType), > StructField("partition", StringType), > StructField("offset", StringType), > StructField("timestamp", StringType), > StructField("value", BinaryType) > ))); > df.cache; > df.registerTempTable("kafka_topic"); > hiveContext.sql(""" > select * > from kafka_topic > """).show > {quote} > > h2. Related Issue > > A similar issue reported for DirectStream is > https://issues.apache.org/jira/browse/SPARK-19185 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org