Josh Rosen created SPARK-17484:
----------------------------------

             Summary: Race condition when cancelling a job during a cache write 
can lead to block fetch failures
                 Key: SPARK-17484
                 URL: https://issues.apache.org/jira/browse/SPARK-17484
             Project: Spark
          Issue Type: Improvement
          Components: Block Manager
    Affects Versions: 2.0.0
            Reporter: Josh Rosen
            Assignee: Josh Rosen


On a production cluster, I observed the following weird behavior where a block 
manager cached a block, the store failed due to a task being killed / 
cancelled, and then a subsequent task incorrectly attempted to read the cached 
block from the machine where the write failed, eventually leading to a complete 
job failure.

Here's the executor log snippet from the machine performing the failed cache:

{code}
16/09/06 16:10:31 INFO MemoryStore: Block rdd_25_1 stored as values in memory 
(estimated size 976.8 MB, free 9.8 GB)
16/09/06 16:10:31 WARN BlockManager: Putting block rdd_25_1 failed
16/09/06 16:10:31 INFO Executor: Executor killed task 0.0 in stage 3.0 (TID 127)
{code}

Here's the exception from the reader in the failed job:

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in 
stage 46.0 failed 4 times, most recent failure: Lost task 4.3 in stage 46.0 
(TID 1484, 10.69.255.197): org.apache.spark.storage.BlockFetchException: Failed 
to fetch block after 1 fetch failures. Most recent failure cause:
        at 
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:565)
        at 
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
{code}

I believe that there's a race condition in how we handle cleanup after failed 
cache stores. Here's an excerpt from {{BlockManager.doPut()}}

{code}
var blockWasSuccessfullyStored: Boolean = false
val result: Option[T] = try {
      val res = putBody(putBlockInfo)
      blockWasSuccessfullyStored = res.isEmpty
      res
    } finally {
      if (blockWasSuccessfullyStored) {
        if (keepReadLock) {
          blockInfoManager.downgradeLock(blockId)
        } else {
          blockInfoManager.unlock(blockId)
        }
      } else {
        blockInfoManager.removeBlock(blockId)
        logWarning(s"Putting block $blockId failed")
      }
  }
{code}

The only way that I think this "successfully stored followed by immediately 
failed" case could appear in our logs is if the local memory store write 
succeeds and then an exception (perhaps InterruptedException) causes us to 
enter the {{finally}} block's error-cleanup path. The problem is that the 
{{finally}} block only cleans up the block's metadata rather than performing 
the full cleanup path which would also notify the master that the block is no 
longer available at this host.

The fact that the Spark task was not resilient in the face of remote block 
fetches is a separate issue which I'll report and fix separately. The scope of 
this JIRA, however, is the fact that Spark still attempted reads from a machine 
which was missing the block.

In order to fix this problem, I think that the {{finally}} block should perform 
more thorough cleanup and should send a "block removed" status update to the 
master following any error during the write. This is necessary because the body 
of {{doPut()}} may have already notified the master of block availability. In 
addition, we can extend the block serving code path to automatically update the 
master with "block deleted" statuses whenever the block manager receives 
invalid requests for blocks that it doesn't have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to