[
https://issues.apache.org/jira/browse/SPARK-13693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258856#comment-15258856
]
Josh Rosen commented on SPARK-13693:
------------------------------------
Yep, there's definitely a race condition here:
{code}
16/04/26 11:14:04.451 block-manager-slave-async-thread-pool-0 INFO
BlockManager: Removing RDD 89
16/04/26 11:14:04.451 JobGenerator INFO JobGenerator: Checkpointing graph for
time 7000 ms
16/04/26 11:14:04.451 JobGenerator INFO DStreamGraph: Updating checkpoint data
for time 7000 ms
16/04/26 11:14:04.451 JobGenerator INFO DStreamGraph: Updated checkpoint data
for time 7000 ms
16/04/26 11:14:04.452 JobGenerator INFO CheckpointWriter: Submitted checkpoint
of time 7000 ms writer queue
16/04/26 11:14:04.452 pool-1787-thread-1 INFO CheckpointWriter: Saving
checkpoint for time 7000 ms to file
'file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/streaming/checkpoint/spark-dcb9046e-7dc0-440b-a406-f02883f3ae3b/checkpoint-7000'
16/04/26 11:14:14.452 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO
CheckpointWriter: CheckpointWriter executor terminated ? false, waited for
10000 ms.
16/04/26 11:14:14.452 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO
JobGenerator: Stopped JobGenerator
16/04/26 11:14:14.452 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO
JobScheduler: Stopped JobScheduler
16/04/26 11:14:14.453 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO
StreamingContext: StreamingContext stopped successfully
16/04/26 11:14:14.453 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO
MapWithStateSuite:
===== FINISHED o.a.s.streaming.MapWithStateSuite: 'mapWithState - basic
operations with advanced API' =====
16/04/26 11:14:14.781 pool-1787-thread-1 WARN CheckpointWriter: Error in
attempt 1 of writing checkpoint to
file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/streaming/checkpoint/spark-dcb9046e-7dc0-440b-a406-f02883f3ae3b/checkpoint-7000
java.io.IOException: java.lang.InterruptedException
at org.apache.hadoop.util.Shell.runCommand(Shell.java:541)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656)
at
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775)
at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/04/26 11:14:14.781 pool-1787-thread-1 WARN CheckpointWriter: Could not write
checkpoint for time 7000 ms to file
file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/streaming/checkpoint/spark-dcb9046e-7dc0-440b-a406-f02883f3ae3b/checkpoint-7000'
16/04/26 11:14:14.786 dispatcher-event-loop-7 INFO
MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
{code}
It looks like the CheckpointWriter doesn't finish writing before we return from
the shutdown. There's code to wait but the wait duration is limited to 10
seconds, which doesn't seem to be enough. From CheckpointWriter:
{code}
def stop(): Unit = synchronized {
if (stopped) return
executor.shutdown()
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10,
java.util.concurrent.TimeUnit.SECONDS)
if (!terminated) {
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated +
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}
{code}
I wonder if the shutdown process is causing the CheckpointWriter thread to get
blocked / locked in a way that prevents it from finishing in time. This is a
unit test with tiny data, so there's no way that the checkpoint should take 10
seconds to write.
> Flaky test: o.a.s.streaming.MapWithStateSuite
> ---------------------------------------------
>
> Key: SPARK-13693
> URL: https://issues.apache.org/jira/browse/SPARK-13693
> Project: Spark
> Issue Type: Test
> Components: Tests
> Reporter: Shixiong Zhu
> Assignee: Shixiong Zhu
> Priority: Minor
> Fix For: 2.0.0
>
>
> Fixed the following flaky test:
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/256/testReport/junit/org.apache.spark.streaming/MapWithStateSuite/_It_is_not_a_test_/
> {code}
> sbt.ForkMain$ForkError: java.io.IOException: Failed to delete:
> /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.7/streaming/checkpoint/spark-e97794a8-b940-4b21-8685-bf1221f9444d
> at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:934)
> at
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply$mcV$sp(MapWithStateSuite.scala:47)
> at
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
> at
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]