[ 
https://issues.apache.org/jira/browse/SPARK-18406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15969384#comment-15969384
 ] 

Yongqin Xiao commented on SPARK-18406:
--------------------------------------

The same JIRA is found under 
https://issues-test.apache.org/jira/browse/SPARK-18406.
Same issue observed in spark 2.1.0 as well.

The issue is observed on some simple spark query that compiles into 3 stages. I 
have some custom RDD being used in this case, and it is registered for 
persistence. In the compute() method of the custom RDD, I spawn a new thread to 
compute the data in the background, and then immediately return an abstract 
iterator (wrapped under a InterruptibleIterator) that gets data from the 
background computation on demand. The assertion happens when iterator of the 
parent RDD reaches the end of the data. This issue doesn't always happen when 
the custom RDD is used in the query, regardless being used once or multiple 
times.

The issue is related to the new thread I created which accesses data from the 
input iterator of parent RDD. The new thread is missing the 
TSS(thread-specific-storage) for TaskContext. I see BlockInfoManager is using 
this TSS TaskContext as key to search the storage.

Here is log showing the task ID being unset in this thread: Line 1819: 
2017/04/13 15:01:01.674 [Thread-33]: TRACE storage.BlockInfoManager: Task -1024 
releasing lock for rdd_25_0

However, I have no way to set TSS for my thread now because the method is made 
protected as below:
object TaskContext {
 ...
private[this] val taskContext: ThreadLocal[TaskContext] = new 
ThreadLocal[TaskContext]
// Note: protected[spark] instead of private[spark] to prevent the following 
two from
 // showing up in JavaDoc.
 /** Set the thread local TaskContext. Internal to Spark. */
 protected[spark] def setTaskContext(tc: TaskContext): Unit = 
taskContext.set(tc)

Just to confirm my theory, I made the TaskContext.setTaskContext public, and 
called it in the beginning of my thread. The use cases that were failing 
consistently with assertion on lock-release now run successful in all scenarios 
I have tried, which include having different number of src/shuffle partitions, 
number of executors, async vs. sequential execution (for having multiple 
downstream custom RDDs pulling data from upstream RDD).

> Race between end-of-task and completion iterator read lock release
> ------------------------------------------------------------------
>
>                 Key: SPARK-18406
>                 URL: https://issues.apache.org/jira/browse/SPARK-18406
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Josh Rosen
>
> The following log comes from a production streaming job where executors 
> periodically die due to uncaught exceptions during block release:
> {code}
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7921
> 16/11/07 17:11:06 INFO Executor: Running task 0.0 in stage 2390.0 (TID 7921)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7922
> 16/11/07 17:11:06 INFO Executor: Running task 1.0 in stage 2390.0 (TID 7922)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7923
> 16/11/07 17:11:06 INFO Executor: Running task 2.0 in stage 2390.0 (TID 7923)
> 16/11/07 17:11:06 INFO TorrentBroadcast: Started reading broadcast variable 
> 2721
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7924
> 16/11/07 17:11:06 INFO Executor: Running task 3.0 in stage 2390.0 (TID 7924)
> 16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721_piece0 stored as 
> bytes in memory (estimated size 5.0 KB, free 4.9 GB)
> 16/11/07 17:11:06 INFO TorrentBroadcast: Reading broadcast variable 2721 took 
> 3 ms
> 16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721 stored as values in 
> memory (estimated size 9.4 KB, free 4.9 GB)
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_3 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_2 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_4 locally
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 2, boot = -566, init = 
> 567, finish = 1
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 7, boot = -540, init = 
> 541, finish = 6
> 16/11/07 17:11:06 INFO Executor: Finished task 2.0 in stage 2390.0 (TID 
> 7923). 1429 bytes result sent to driver
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 8, boot = -532, init = 
> 533, finish = 7
> 16/11/07 17:11:06 INFO Executor: Finished task 3.0 in stage 2390.0 (TID 
> 7924). 1429 bytes result sent to driver
> 16/11/07 17:11:06 ERROR Executor: Exception in task 0.0 in stage 2390.0 (TID 
> 7921)
> java.lang.AssertionError: assertion failed
>       at scala.Predef$.assert(Predef.scala:165)
>       at 
> org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
>       at 
> org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
>       at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
>       at 
> org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7925
> 16/11/07 17:11:06 INFO Executor: Running task 0.1 in stage 2390.0 (TID 7925)
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 41, boot = -536, init = 
> 576, finish = 1
> 16/11/07 17:11:06 INFO Executor: Finished task 1.0 in stage 2390.0 (TID 
> 7922). 1429 bytes result sent to driver
> 16/11/07 17:11:06 ERROR Utils: Uncaught exception in thread stdout writer for 
> /databricks/python/bin/python
> java.lang.AssertionError: assertion failed: Block rdd_2741_1 is not locked 
> for reading
>       at scala.Predef$.assert(Predef.scala:179)
>       at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294)
>       at 
> org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:630)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:434)
>       at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
>       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1882)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
> 16/11/07 17:11:06 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[stdout writer for /databricks/python/bin/python,5,main]
> java.lang.AssertionError: assertion failed: Block rdd_2741_1 is not locked 
> for reading
>       at scala.Predef$.assert(Predef.scala:179)
>       at 
> org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294)
>       at 
> org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:630)
>       at 
> org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:434)
>       at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
>       at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1882)
>       at 
> org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala
> {code}
> I think that there's some sort of internal race condition between a task 
> finishing (TID 7921) and automatically releasing locks and between some 
> "automatically release locks on hitting the end of an iterator" logic running 
> in a separate thread. The log above came from a production streaming job 
> where executors periodically died with this type of error.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to