[
https://issues.apache.org/jira/browse/SPARK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dmitry Tverdokhleb updated SPARK-33121:
---------------------------------------
Description:
Hi. I am trying to migrate from spark 2.4.5 to 3.1.1 and there is a problem in
graceful shutdown.
Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".
Here is the code:
{code:java}
inputStream.foreachRDD {
rdd =>
rdd.foreachPartition {
Thread.sleep(5000)
}
}
{code}
I send a SIGTERM signal to stop the spark streaming and after sleeping an
exception arises:
{noformat}
streaming-agg-tds-data_1 | java.util.concurrent.RejectedExecutionException:
Task org.apache.spark.executor.Executor$TaskRunner@7ca7f0b8 rejected from
java.util.concurrent.ThreadPoolExecutor@2474219c[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 1]
streaming-agg-tds-data_1 | at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
streaming-agg-tds-data_1 | at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
streaming-agg-tds-data_1 | at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
streaming-agg-tds-data_1 | at
org.apache.spark.executor.Executor.launchTask(Executor.scala:270)
streaming-agg-tds-data_1 | at
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
streaming-agg-tds-data_1 | at
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
streaming-agg-tds-data_1 | at
scala.collection.Iterator.foreach(Iterator.scala:941)
streaming-agg-tds-data_1 | at
scala.collection.Iterator.foreach$(Iterator.scala:941)
streaming-agg-tds-data_1 | at
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
streaming-agg-tds-data_1 | at
scala.collection.IterableLike.foreach(IterableLike.scala:74)
streaming-agg-tds-data_1 | at
scala.collection.IterableLike.foreach$(IterableLike.scala:73)
streaming-agg-tds-data_1 | at
scala.collection.AbstractIterable.foreach(Iterable.scala:56)
streaming-agg-tds-data_1 | at
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
streaming-agg-tds-data_1 | at
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
streaming-agg-tds-data_1 | at
org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
streaming-agg-tds-data_1 | at
org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
streaming-agg-tds-data_1 | at
org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
streaming-agg-tds-data_1 | at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
streaming-agg-tds-data_1 | at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
streaming-agg-tds-data_1 | at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
streaming-agg-tds-data_1 | at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
streaming-agg-tds-data_1 | at java.lang.Thread.run(Thread.java:748)
streaming-agg-tds-data_1 | 2021-04-22 13:33:41 WARN JobGenerator - Timed out
while stopping the job generator (timeout = 10000)
streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Waited for
jobs to be processed and checkpoints to be written
streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Stopped
JobGenerator{noformat}
After this exception and "JobGenerator - Stopped JobGenerator", streaming
freezes and halts by timeout (Config parameter
"hadoop.service.shutdown.timeout").
Besides, there is no problem with the graceful shutdown in spark 2.4.5.
was:
Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in
graceful shutdown.
Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".
Here is the code:
{code:java}
inputStream.foreachRDD {
rdd =>
rdd
.foreachPartition {
Thread.sleep(5000)
}
}
{code}
I send a SIGTERM signal to stop the spark streaming, but exception arrises:
{noformat}
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 6]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
at
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
at
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
at
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
A piece of logs:
{noformat}
...
Calling rdd.mapPartitions
...
Sending SIGTERM signal
...
2020-10-12 14:12:22 INFO MyProject - Shutdown hook called
2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped
2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully
2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to be
consumed for job generation
2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be
consumed for job generation
...
Calling rdd.filter
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
After this exception, streaming freezes and halts by timeout (Config parameter
"hadoop.service.shutdown.timeout").
Pay attention, this exception arises only for RDD operations (Like map, filter,
etc.), business logic is processing normally without any errors.
Besides, there is no problem with graceful shutdown in spark 2.4.5.
> Spark does not shutdown gracefully
> ----------------------------------
>
> Key: SPARK-33121
> URL: https://issues.apache.org/jira/browse/SPARK-33121
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 3.0.1
> Reporter: Dmitry Tverdokhleb
> Priority: Major
>
> Hi. I am trying to migrate from spark 2.4.5 to 3.1.1 and there is a problem
> in graceful shutdown.
> Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".
> Here is the code:
> {code:java}
> inputStream.foreachRDD {
> rdd =>
> rdd.foreachPartition {
> Thread.sleep(5000)
> }
> }
> {code}
> I send a SIGTERM signal to stop the spark streaming and after sleeping an
> exception arises:
> {noformat}
> streaming-agg-tds-data_1 | java.util.concurrent.RejectedExecutionException:
> Task org.apache.spark.executor.Executor$TaskRunner@7ca7f0b8 rejected from
> java.util.concurrent.ThreadPoolExecutor@2474219c[Terminated, pool size = 0,
> active threads = 0, queued tasks = 0, completed tasks = 1]
> streaming-agg-tds-data_1 | at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> streaming-agg-tds-data_1 | at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> streaming-agg-tds-data_1 | at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
> streaming-agg-tds-data_1 | at
> org.apache.spark.executor.Executor.launchTask(Executor.scala:270)
> streaming-agg-tds-data_1 | at
> org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
> streaming-agg-tds-data_1 | at
> org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
> streaming-agg-tds-data_1 | at
> scala.collection.Iterator.foreach(Iterator.scala:941)
> streaming-agg-tds-data_1 | at
> scala.collection.Iterator.foreach$(Iterator.scala:941)
> streaming-agg-tds-data_1 | at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> streaming-agg-tds-data_1 | at
> scala.collection.IterableLike.foreach(IterableLike.scala:74)
> streaming-agg-tds-data_1 | at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> streaming-agg-tds-data_1 | at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> streaming-agg-tds-data_1 | at
> org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
> streaming-agg-tds-data_1 | at
> org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
> streaming-agg-tds-data_1 | at
> org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
> streaming-agg-tds-data_1 | at
> org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> streaming-agg-tds-data_1 | at
> org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> streaming-agg-tds-data_1 | at
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> streaming-agg-tds-data_1 | at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> streaming-agg-tds-data_1 | at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> streaming-agg-tds-data_1 | at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> streaming-agg-tds-data_1 | at java.lang.Thread.run(Thread.java:748)
> streaming-agg-tds-data_1 | 2021-04-22 13:33:41 WARN JobGenerator - Timed
> out while stopping the job generator (timeout = 10000)
> streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Waited
> for jobs to be processed and checkpoints to be written
> streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Stopped
> JobGenerator{noformat}
> After this exception and "JobGenerator - Stopped JobGenerator", streaming
> freezes and halts by timeout (Config parameter
> "hadoop.service.shutdown.timeout").
> Besides, there is no problem with the graceful shutdown in spark 2.4.5.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]