Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a3aa22a59 -> ab006523b


[SPARK-13566][CORE] Avoid deadlock between BlockManager and Executor Thread

Temp patch for branch 1.6, avoid deadlock between BlockManager and Executor 
Thread.

Author: cenyuhai <cenyu...@didichuxing.com>

Closes #11546 from cenyuhai/SPARK-13566.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab006523
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab006523
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab006523

Branch: refs/heads/branch-1.6
Commit: ab006523b840b1d2dbf3f5ff0a238558e7665a1e
Parents: a3aa22a
Author: cenyuhai <cenyu...@didichuxing.com>
Authored: Fri May 6 13:50:49 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri May 6 13:50:49 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  12 ++
 .../org/apache/spark/storage/BlockManager.scala | 192 ++++++++++++-------
 .../spark/storage/BlockManagerSuite.scala       |  38 ++++
 3 files changed, 170 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab006523/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ab5bde5..b248e12 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -218,6 +218,7 @@ private[spark] class Executor(
           threwException = false
           res
         } finally {
+          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
           val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
           if (freedMemory > 0) {
             val errMsg = s"Managed memory leak detected; size = $freedMemory 
bytes, TID = $taskId"
@@ -227,6 +228,17 @@ private[spark] class Executor(
               logError(errMsg)
             }
           }
+
+          if (releasedLocks.nonEmpty) {
+            val errMsg =
+              s"${releasedLocks.size} block locks were not released by TID = 
$taskId:\n" +
+              releasedLocks.mkString("[", ", ", "]")
+            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && 
!threwException) {
+              throw new SparkException(errMsg)
+            } else {
+              logError(errMsg)
+            }
+          }
         }
         val taskFinish = System.currentTimeMillis()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab006523/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 538272d..288f756 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,12 +19,14 @@ package org.apache.spark.storage
 
 import java.io._
 import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.util.Random
 import scala.util.control.NonFatal
+import scala.collection.JavaConverters._
 
 import sun.nio.ch.DirectBuffer
 
@@ -65,7 +67,7 @@ private[spark] class BlockManager(
     val master: BlockManagerMaster,
     defaultSerializer: Serializer,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    val memoryManager: MemoryManager,
     mapOutputTracker: MapOutputTracker,
     shuffleManager: ShuffleManager,
     blockTransferService: BlockTransferService,
@@ -164,6 +166,11 @@ private[spark] class BlockManager(
    * loaded yet. */
   private lazy val compressionCodec: CompressionCodec = 
CompressionCodec.createCodec(conf)
 
+  // Blocks are removing by another thread
+  val pendingToRemove = new ConcurrentHashMap[BlockId, Long]()
+
+  private val NON_TASK_WRITER = -1024L
+
   /**
    * Initializes the BlockManager with the given appId. This is not performed 
in the constructor as
    * the appId may not be known at BlockManager instantiation time (in 
particular for the driver,
@@ -1025,54 +1032,58 @@ private[spark] class BlockManager(
     val info = blockInfo.get(blockId).orNull
 
     // If the block has not already been dropped
-    if (info != null) {
-      info.synchronized {
-        // required ? As of now, this will be invoked only for blocks which 
are ready
-        // But in case this changes in future, adding for consistency sake.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning(s"Block $blockId was marked as failure. Nothing to drop")
-          return None
-        } else if (blockInfo.get(blockId).isEmpty) {
-          logWarning(s"Block $blockId was already dropped.")
-          return None
-        }
-        var blockIsUpdated = false
-        val level = info.level
+    if (info != null && pendingToRemove.putIfAbsent(blockId, 
currentTaskAttemptId) == 0L) {
+      try {
+        info.synchronized {
+          // required ? As of now, this will be invoked only for blocks which 
are ready
+          // But in case this changes in future, adding for consistency sake.
+          if (!info.waitForReady()) {
+            // If we get here, the block write failed.
+            logWarning(s"Block $blockId was marked as failure. Nothing to 
drop")
+            return None
+          } else if (blockInfo.get(blockId).isEmpty) {
+            logWarning(s"Block $blockId was already dropped.")
+            return None
+          }
+          var blockIsUpdated = false
+          val level = info.level
 
-        // Drop to disk, if storage level requires
-        if (level.useDisk && !diskStore.contains(blockId)) {
-          logInfo(s"Writing block $blockId to disk")
-          data() match {
-            case Left(elements) =>
-              diskStore.putArray(blockId, elements, level, returnValues = 
false)
-            case Right(bytes) =>
-              diskStore.putBytes(blockId, bytes, level)
+          // Drop to disk, if storage level requires
+          if (level.useDisk && !diskStore.contains(blockId)) {
+            logInfo(s"Writing block $blockId to disk")
+            data() match {
+              case Left(elements) =>
+                diskStore.putArray(blockId, elements, level, returnValues = 
false)
+              case Right(bytes) =>
+                diskStore.putBytes(blockId, bytes, level)
+            }
+            blockIsUpdated = true
           }
-          blockIsUpdated = true
-        }
 
-        // Actually drop from memory store
-        val droppedMemorySize =
-          if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 
0L
-        val blockIsRemoved = memoryStore.remove(blockId)
-        if (blockIsRemoved) {
-          blockIsUpdated = true
-        } else {
-          logWarning(s"Block $blockId could not be dropped from memory as it 
does not exist")
-        }
+          // Actually drop from memory store
+          val droppedMemorySize =
+            if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) 
else 0L
+          val blockIsRemoved = memoryStore.remove(blockId)
+          if (blockIsRemoved) {
+            blockIsUpdated = true
+          } else {
+            logWarning(s"Block $blockId could not be dropped from memory as it 
does not exist")
+          }
 
-        val status = getCurrentBlockStatus(blockId, info)
-        if (info.tellMaster) {
-          reportBlockStatus(blockId, info, status, droppedMemorySize)
-        }
-        if (!level.useDisk) {
-          // The block is completely gone from this node; forget it so we can 
put() it again later.
-          blockInfo.remove(blockId)
-        }
-        if (blockIsUpdated) {
-          return Some(status)
+          val status = getCurrentBlockStatus(blockId, info)
+          if (info.tellMaster) {
+            reportBlockStatus(blockId, info, status, droppedMemorySize)
+          }
+          if (!level.useDisk) {
+            // The block is completely gone from this node;forget it so we can 
put() it again later.
+            blockInfo.remove(blockId)
+          }
+          if (blockIsUpdated) {
+            return Some(status)
+          }
         }
+      } finally {
+        pendingToRemove.remove(blockId)
       }
     }
     None
@@ -1108,27 +1119,32 @@ private[spark] class BlockManager(
   def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
     logDebug(s"Removing block $blockId")
     val info = blockInfo.get(blockId).orNull
-    if (info != null) {
-      info.synchronized {
-        // Removals are idempotent in disk store and memory store. At worst, 
we get a warning.
-        val removedFromMemory = memoryStore.remove(blockId)
-        val removedFromDisk = diskStore.remove(blockId)
-        val removedFromExternalBlockStore =
-          if (externalBlockStoreInitialized) 
externalBlockStore.remove(blockId) else false
-        if (!removedFromMemory && !removedFromDisk && 
!removedFromExternalBlockStore) {
-          logWarning(s"Block $blockId could not be removed as it was not found 
in either " +
-            "the disk, memory, or external block store")
-        }
-        blockInfo.remove(blockId)
-        val status = getCurrentBlockStatus(blockId, info)
-        if (tellMaster && info.tellMaster) {
-          reportBlockStatus(blockId, info, status)
-        }
-        Option(TaskContext.get()).foreach { tc =>
-          val metrics = tc.taskMetrics()
-          val lastUpdatedBlocks = 
metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
-          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, 
status)))
+    if (info != null && pendingToRemove.putIfAbsent(blockId, 
currentTaskAttemptId) == 0L) {
+      try {
+        info.synchronized {
+          val level = info.level
+          // Removals are idempotent in disk store and memory store. At worst, 
we get a warning.
+          val removedFromMemory = if (level.useMemory) 
memoryStore.remove(blockId) else false
+          val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) 
else false
+          val removedFromExternalBlockStore =
+            if (externalBlockStoreInitialized) 
externalBlockStore.remove(blockId) else false
+          if (!removedFromMemory && !removedFromDisk && 
!removedFromExternalBlockStore) {
+            logWarning(s"Block $blockId could not be removed as it was not 
found in either " +
+              "the disk, memory, or external block store")
+          }
+          blockInfo.remove(blockId)
+          val status = getCurrentBlockStatus(blockId, info)
+          if (tellMaster && info.tellMaster) {
+            reportBlockStatus(blockId, info, status)
+          }
+          Option(TaskContext.get()).foreach { tc =>
+            val metrics = tc.taskMetrics()
+            val lastUpdatedBlocks = 
metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+            metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, 
status)))
+          }
         }
+      } finally {
+        pendingToRemove.remove(blockId)
       }
     } else {
       // The block has already been removed; do nothing.
@@ -1151,14 +1167,19 @@ private[spark] class BlockManager(
     while (iterator.hasNext) {
       val entry = iterator.next()
       val (id, info, time) = (entry.getKey, entry.getValue.value, 
entry.getValue.timestamp)
-      if (time < cleanupTime && shouldDrop(id)) {
-        info.synchronized {
-          val level = info.level
-          if (level.useMemory) { memoryStore.remove(id) }
-          if (level.useDisk) { diskStore.remove(id) }
-          if (level.useOffHeap) { externalBlockStore.remove(id) }
-          iterator.remove()
-          logInfo(s"Dropped block $id")
+      if (time < cleanupTime && shouldDrop(id) &&
+        pendingToRemove.putIfAbsent(id, currentTaskAttemptId) == 0L) {
+        try {
+          info.synchronized {
+            val level = info.level
+            if (level.useMemory) { memoryStore.remove(id) }
+            if (level.useDisk) { diskStore.remove(id) }
+            if (level.useOffHeap) { externalBlockStore.remove(id) }
+            iterator.remove()
+            logInfo(s"Dropped block $id")
+          }
+        } finally {
+          pendingToRemove.remove(id)
         }
         val status = getCurrentBlockStatus(id, info)
         reportBlockStatus(id, info, status)
@@ -1166,6 +1187,32 @@ private[spark] class BlockManager(
     }
   }
 
+  private def currentTaskAttemptId: Long = {
+    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(NON_TASK_WRITER)
+  }
+
+  def getBlockInfo(blockId: BlockId): BlockInfo = {
+    blockInfo.get(blockId).orNull
+  }
+
+  /**
+   * Release all lock held by the given task, clearing that task's pin 
bookkeeping
+   * structures and updating the global pin counts. This method should be 
called at the
+   * end of a task (either by a task completion handler or in 
`TaskRunner.run()`).
+   *
+   * @return the ids of blocks whose pins were released
+   */
+  def releaseAllLocksForTask(taskAttemptId: Long): ArrayBuffer[BlockId] = {
+    var selectLocks = ArrayBuffer[BlockId]()
+    pendingToRemove.entrySet().asScala.foreach { entry =>
+      if (entry.getValue == taskAttemptId) {
+        pendingToRemove.remove(entry.getKey)
+        selectLocks += entry.getKey
+      }
+    }
+    selectLocks
+  }
+
   private def shouldCompress(blockId: BlockId): Boolean = {
     blockId match {
       case _: ShuffleBlockId => compressShuffle
@@ -1239,6 +1286,7 @@ private[spark] class BlockManager(
     rpcEnv.stop(slaveEndpoint)
     blockInfo.clear()
     memoryStore.clear()
+    pendingToRemove.clear()
     diskStore.clear()
     if (externalBlockStoreInitialized) {
       externalBlockStore.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/ab006523/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index c00591f..4e66714 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
+import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
@@ -424,6 +425,43 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     }
   }
 
+  test("deadlock between dropFromMemory and removeBlock") {
+    store = makeBlockManager(2000)
+    val a1 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    val lock1 = new CountDownLatch(1)
+    val lock2 = new CountDownLatch(1)
+
+    val t2 = new Thread {
+      override def run() = {
+        val info = store.getBlockInfo("a1")
+        info.synchronized {
+          store.pendingToRemove.put("a1", 1L)
+          lock1.countDown()
+          lock2.await()
+          store.pendingToRemove.remove("a1")
+        }
+      }
+    }
+
+    val t1 = new Thread {
+      override def run() = {
+        store.memoryManager.synchronized {
+          t2.start()
+          lock1.await()
+          val status = store.dropFromMemory("a1", null: Either[Array[Any], 
ByteBuffer])
+          assert(status == None, "this thread can not get block a1")
+          lock2.countDown()
+        }
+      }
+    }
+
+    t1.start()
+    t1.join()
+    t2.join()
+    store.removeBlock("a1", tellMaster = false)
+  }
+
   test("correct BlockResult returned from get() calls") {
     store = makeBlockManager(12000)
     val list1 = List(new Array[Byte](2000), new Array[Byte](2000))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to