[ 
https://issues.apache.org/jira/browse/SPARK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhaoshijie updated SPARK-22955:
-------------------------------
    Description: 
when I stop a spark-streaming application with parameter 
 spark.streaming.stopGracefullyOnShutdown, I get ERROR as follows:

{code:java}
2018-01-04 17:31:17,524 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
RECEIVED SIGNAL TERM
2018-01-04 17:31:17,527 INFO org.apache.spark.streaming.StreamingContext: 
Invoking stop(stopGracefully=true) from shutdown hook
2018-01-04 17:31:17,530 INFO 
org.apache.spark.streaming.scheduler.ReceiverTracker: ReceiverTracker stopped
2018-01-04 17:31:17,531 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Stopping JobGenerator gracefully
2018-01-04 17:31:17,532 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Waiting for all received blocks to be consumed for job generation
2018-01-04 17:31:17,533 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Waited for all received blocks to be consumed for job generation
2018-01-04 17:31:17,747 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058267000 ms
2018-01-04 17:31:18,302 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058268000 ms
2018-01-04 17:31:18,785 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058269000 ms
2018-01-04 17:31:19,001 INFO org.apache.spark.streaming.util.RecurringTimer: 
Stopped timer for JobGenerator after time 1515058279000
2018-01-04 17:31:19,200 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058270000 ms
2018-01-04 17:31:19,207 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Stopped generation timer
2018-01-04 17:31:19,207 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Waiting for jobs to be processed and checkpoints to be written
2018-01-04 17:31:19,210 ERROR 
org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for 
time 1515058271000 ms
java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:161)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:180)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:208)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at 
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at 
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:36)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

It looks like KafkaConsumer close before JobGenerator stop , then I view the 
source code, 
!https://raw.githubusercontent.com/smdfj/picture/master/spark/JobGenerator.png!



I find graph.stop() will stop Dstream,then close KafkaConsumer,but 
JobGenerator.eventLoop has not stopped,so error occured.

  was:
when I stop a spark-streaming application with parameter 
 spark.streaming.stopGracefullyOnShutdown, I get ERROR as follows:

{code:java}
2018-01-04 17:31:17,524 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
RECEIVED SIGNAL TERM
2018-01-04 17:31:17,527 INFO org.apache.spark.streaming.StreamingContext: 
Invoking stop(stopGracefully=true) from shutdown hook
2018-01-04 17:31:17,530 INFO 
org.apache.spark.streaming.scheduler.ReceiverTracker: ReceiverTracker stopped
2018-01-04 17:31:17,531 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Stopping JobGenerator gracefully
2018-01-04 17:31:17,532 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Waiting for all received blocks to be consumed for job generation
2018-01-04 17:31:17,533 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Waited for all received blocks to be consumed for job generation
2018-01-04 17:31:17,747 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058267000 ms
2018-01-04 17:31:18,302 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058268000 ms
2018-01-04 17:31:18,785 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058269000 ms
2018-01-04 17:31:19,001 INFO org.apache.spark.streaming.util.RecurringTimer: 
Stopped timer for JobGenerator after time 1515058279000
2018-01-04 17:31:19,200 INFO org.apache.spark.streaming.scheduler.JobScheduler: 
Added jobs for time 1515058270000 ms
2018-01-04 17:31:19,207 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Stopped generation timer
2018-01-04 17:31:19,207 INFO org.apache.spark.streaming.scheduler.JobGenerator: 
Waiting for jobs to be processed and checkpoints to be written
2018-01-04 17:31:19,210 ERROR 
org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for 
time 1515058271000 ms
java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:161)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:180)
        at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:208)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at 
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at 
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:36)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

It looks like KafkaConsumer close before JobGenerator stop , then I view the 
source code, 
!https://raw.githubusercontent.com/smdfj/picture/master/spark/JobGenerator.png!

I find graph.stop() will stop Dstream,then close KafkaConsumer,but 
JobGenerator.eventLoop has not stopped,so error occured.


> Error generating jobs when Stopping JobGenerator gracefully
> -----------------------------------------------------------
>
>                 Key: SPARK-22955
>                 URL: https://issues.apache.org/jira/browse/SPARK-22955
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.2.0
>            Reporter: zhaoshijie
>
> when I stop a spark-streaming application with parameter 
>  spark.streaming.stopGracefullyOnShutdown, I get ERROR as follows:
> {code:java}
> 2018-01-04 17:31:17,524 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
> RECEIVED SIGNAL TERM
> 2018-01-04 17:31:17,527 INFO org.apache.spark.streaming.StreamingContext: 
> Invoking stop(stopGracefully=true) from shutdown hook
> 2018-01-04 17:31:17,530 INFO 
> org.apache.spark.streaming.scheduler.ReceiverTracker: ReceiverTracker stopped
> 2018-01-04 17:31:17,531 INFO 
> org.apache.spark.streaming.scheduler.JobGenerator: Stopping JobGenerator 
> gracefully
> 2018-01-04 17:31:17,532 INFO 
> org.apache.spark.streaming.scheduler.JobGenerator: Waiting for all received 
> blocks to be consumed for job generation
> 2018-01-04 17:31:17,533 INFO 
> org.apache.spark.streaming.scheduler.JobGenerator: Waited for all received 
> blocks to be consumed for job generation
> 2018-01-04 17:31:17,747 INFO 
> org.apache.spark.streaming.scheduler.JobScheduler: Added jobs for time 
> 1515058267000 ms
> 2018-01-04 17:31:18,302 INFO 
> org.apache.spark.streaming.scheduler.JobScheduler: Added jobs for time 
> 1515058268000 ms
> 2018-01-04 17:31:18,785 INFO 
> org.apache.spark.streaming.scheduler.JobScheduler: Added jobs for time 
> 1515058269000 ms
> 2018-01-04 17:31:19,001 INFO org.apache.spark.streaming.util.RecurringTimer: 
> Stopped timer for JobGenerator after time 1515058279000
> 2018-01-04 17:31:19,200 INFO 
> org.apache.spark.streaming.scheduler.JobScheduler: Added jobs for time 
> 1515058270000 ms
> 2018-01-04 17:31:19,207 INFO 
> org.apache.spark.streaming.scheduler.JobGenerator: Stopped generation timer
> 2018-01-04 17:31:19,207 INFO 
> org.apache.spark.streaming.scheduler.JobGenerator: Waiting for jobs to be 
> processed and checkpoints to be written
> 2018-01-04 17:31:19,210 ERROR 
> org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for 
> time 1515058271000 ms
> java.lang.IllegalStateException: This consumer has already been closed.
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
>       at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:161)
>       at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:180)
>       at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:208)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>       at 
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>       at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>       at 
> org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:36)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>       at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>       at 
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>       at scala.Option.orElse(Option.scala:289)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> It looks like KafkaConsumer close before JobGenerator stop , then I view the 
> source code, 
> !https://raw.githubusercontent.com/smdfj/picture/master/spark/JobGenerator.png!
> I find graph.stop() will stop Dstream,then close KafkaConsumer,but 
> JobGenerator.eventLoop has not stopped,so error occured.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to