Repository: spark
Updated Branches:
  refs/heads/branch-2.0 72e1f83d7 -> 79fbfbbc7


[SPARK-18406][CORE][BACKPORT-2.0] Race between end-of-task and completion 
iterator read lock release

This is a backport PR of  #18076 to 2.0 and 2.1.

## What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the 
task, just like the reported cases in this issue, we fail to get to TID from 
TaskContext and that causes unable to release the lock and assertion failures. 
To resolve this, we have to explicitly pass the TID value to the `unlock` 
method.

## How was this patch tested?

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang <[email protected]>

Closes #18096 from jiangxb1987/completion-iterator-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79fbfbbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79fbfbbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79fbfbbc

Branch: refs/heads/branch-2.0
Commit: 79fbfbbc7ad7fea1ca4124981201e947db67745d
Parents: 72e1f83
Author: Xingbo Jiang <[email protected]>
Authored: Wed May 24 14:34:17 2017 -0700
Committer: Xiao Li <[email protected]>
Committed: Wed May 24 14:34:17 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/network/BlockDataManager.scala |  2 +-
 .../apache/spark/storage/BlockInfoManager.scala | 15 +++++++++-----
 .../org/apache/spark/storage/BlockManager.scala | 21 +++++++++++++++-----
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 18 ++++++++++++++++-
 4 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 8f83668..b3f8bfe 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -46,5 +46,5 @@ trait BlockDataManager {
   /**
    * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
    */
-  def releaseLock(blockId: BlockId): Unit
+  def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index dd8f5ba..c0e18e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging {
 
   /**
    * Release a lock on the given block.
+   * In case a TaskContext is not propagated properly to all child threads for 
the task, we fail to
+   * get the TID from TaskContext, so we have to explicitly pass the TID value 
to release the lock.
+   *
+   * See SPARK-18406 for more discussion of this issue.
    */
-  def unlock(blockId: BlockId): Unit = synchronized {
-    logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+  def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): 
Unit = synchronized {
+    val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
+    logTrace(s"Task $taskId releasing lock for $blockId")
     val info = get(blockId).getOrElse {
       throw new IllegalStateException(s"Block $blockId not found")
     }
     if (info.writerTask != BlockInfo.NO_WRITER) {
       info.writerTask = BlockInfo.NO_WRITER
-      writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+      writeLocksByTask.removeBinding(taskId, blockId)
     } else {
       assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
       info.readerCount -= 1
-      val countsForTask = readLocksByTask(currentTaskAttemptId)
+      val countsForTask = readLocksByTask(taskId)
       val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
       assert(newPinCountForTask >= 0,
-        s"Task $currentTaskAttemptId release lock on block $blockId more times 
than it acquired it")
+        s"Task $taskId release lock on block $blockId more times than it 
acquired it")
     }
     notifyAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f9e48b2..6ee4c05 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -427,6 +427,7 @@ private[spark] class BlockManager(
       case Some(info) =>
         val level = info.level
         logDebug(s"Level for block $blockId is $level")
+        val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
         if (level.useMemory && memoryStore.contains(blockId)) {
           val iter: Iterator[Any] = if (level.deserialized) {
             memoryStore.getValues(blockId).get
@@ -434,7 +435,12 @@ private[spark] class BlockManager(
             serializerManager.dataDeserializeStream(
               blockId, 
memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
+          // We need to capture the current taskId in case the iterator 
completion is triggered
+          // from a different thread which does not have TaskContext set; see 
SPARK-18406 for
+          // discussion.
+          val ci = CompletionIterator[Any, Iterator[Any]](iter, {
+            releaseLock(blockId, taskAttemptId)
+          })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
           val iterToReturn: Iterator[Any] = {
@@ -451,7 +457,9 @@ private[spark] class BlockManager(
               serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
             }
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, 
releaseLock(blockId))
+          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+            releaseLock(blockId, taskAttemptId)
+          })
           Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
@@ -627,10 +635,13 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Release a lock on the given block.
+   * Release a lock on the given block with explicit TID.
+   * The param `taskAttemptId` should be passed in case we can't get the 
correct TID from
+   * TaskContext, for example, the input iterator of a cached RDD iterates to 
the end in a child
+   * thread.
    */
-  def releaseLock(blockId: BlockId): Unit = {
-    blockInfoManager.unlock(blockId)
+  def releaseLock(blockId: BlockId, taskAttemptId: Option[Long] = None): Unit 
= {
+    blockInfoManager.unlock(blockId, taskAttemptId)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index ad56715..8d06f54 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
 import org.apache.spark._
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDDSuiteUtils._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 class RDDSuite extends SparkFunSuite with SharedSparkContext {
   var tempDir: File = _
@@ -1082,6 +1082,22 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
     assert(totalPartitionCount == 10)
   }
 
+  test("SPARK-18406: race between end-of-task and completion iterator read 
lock release") {
+    val rdd = sc.parallelize(1 to 1000, 10)
+    rdd.cache()
+
+    rdd.mapPartitions { iter =>
+      ThreadUtils.runInNewThread("TestThread") {
+        // Iterate to the end of the input iterator, to cause the 
CompletionIterator completion to
+        // fire outside of the task's main thread.
+        while (iter.hasNext) {
+          iter.next()
+        }
+        iter
+      }
+    }.collect()
+  }
+
   // NOTE
   // Below tests calling sc.stop() have to be the last tests in this suite. If 
there are tests
   // running after them and if they access sc those tests will fail as sc is 
already closed, because


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to