[ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dmitry Goldenberg resolved SPARK-27567. --------------------------------------- Resolution: Not A Bug In our case, issue was caused by someone changing the IP address of the box. Ideally, Kafka or Spark could have more "telling", more intuitive error messages for these types of issues. > Spark Streaming consumers (from Kafka) intermittently die with > 'SparkException: Couldn't find leaders for Set' > -------------------------------------------------------------------------------------------------------------- > > Key: SPARK-27567 > URL: https://issues.apache.org/jira/browse/SPARK-27567 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 1.5.0 > Environment: GCP / 170~14.04.1-Ubuntu > Reporter: Dmitry Goldenberg > Priority: Major > > Some of our consumers intermittently die with the stack traces I'm including. > Once restarted they run for a while then die again. > I can't find any cohesive documentation on what this error means and how to > go about troubleshooting it. Any help would be appreciated. > *Kafka version* is 0.8.2.1 (2.10-0.8.2.1). > Some of the errors seen look like this: > {noformat} > ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on > 10.150.0.54: remote Rpc client disassociated{noformat} > Main error stack trace: > {noformat} > 2019-04-23 20:36:54,323 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error g > enerating jobs for time 1556066214000 ms > org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: > Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], > [hdfs.hbase.acme.attachmen > ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], > [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], > [hdfs.hbase.acme.attachme > nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], > [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], > [hdfs.hbase.acme.attachme > nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], > [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], > [hdfs.hbase.acme.attach > ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], > [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], > [hdfs.hbase.acme.attac > hments,29], [hdfs.hbase.acme.attachments,33], > [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], > [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att > achments,21], [hdfs.hbase.acme.attachments,3], > [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], > [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at > tachments,61])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j > ar:?] > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.Option.orElse(Option.scala:257) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.Option.orElse(Option.scala:257) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.util.Try$.apply(Try.scala:161) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > ~[spark-assembly-1. > 5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > Exception in thread "main" org.apache.spark.SparkException: > ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for > Set([hdfs.hbase.acme.attachments,49], > [hdfs.hbase.acme.attachments,63], [hdfs.hbase.acme.attachments,31], > [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], > [hdfs.hbase.acme.attachments,55] > , [hdfs.hbase.acme.attachments,5], [hdfs.hbase.acme.attachments,37], > [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], > [hdfs.hbase.acme.attachments,13] > , [hdfs.hbase.acme.attachments,43], [hdfs.hbase.acme.attachments,19], > [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], > [hdfs.hbase.acme.attachments,5 > 3], [hdfs.hbase.acme.attachments,1], [hdfs.hbase.acme.attachments,27], > [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], > [hdfs.hbase.acme.attachments, > 11], [hdfs.hbase.acme.attachments,29], [hdfs.hbase.acme.attachments,33], > [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], > [hdfs.hbase.acme.attachment > s,45], [hdfs.hbase.acme.attachments,21], [hdfs.hbase.acme.attachments,3], > [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], > [hdfs.hbase.acme.attachmen > ts,17], [hdfs.hbase.acme.attachments,61])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > 2019-04-23 20:36:55,265 FATAL Unable to register shutdown hook because JVM is > shutting down. > [Stage 15597:=================================> (41 + 6) / > 64]Exception in thread "streaming-job-executor-0" java.lang.Error: > java.lang.InterruptedExc > eption > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) > at > org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47) > at > com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218) > at > com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ... 2 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org