Hi:
I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the last 
few days, after running the application for 30-60 minutes get exception from 
Kafka Consumer included below.

The structured streaming application is processing 1 minute worth of data from 
kafka topic. So I've tried increasing request.timeout.ms from 40000 seconds 
default to 45000 seconds and receive.buffer.bytes to 1mb but still get the same 
exception.
Is there any spark/kafka configuration that can save the offset and retry it 
next time rather than throwing an exception and killing the application.
I've tried googling but have not found substantial solution/recommendation.  If 
anyone has any suggestions or a different version etc, please let me know.
Thanks
Here is the exception stack trace.

java.util.concurrent.TimeoutException: Cannot fetch record for offset <offset#> 
in 120000 millisecondsat 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
 at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
 at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
 at 

Reply via email to