[ https://issues.apache.org/jira/browse/SPARK-25775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ablimit A. Keskin updated SPARK-25775: -------------------------------------- Affects Version/s: (was: 2.2.0) 2.2.2 > Race between end-of-task and completion iterator read lock release > ------------------------------------------------------------------ > > Key: SPARK-25775 > URL: https://issues.apache.org/jira/browse/SPARK-25775 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core > Affects Versions: 2.2.2 > Reporter: Ablimit A. Keskin > Priority: Major > > The following issue comes from a production Spark job where executors die due > to uncaught exceptions during block release. When the task run with a > specific configuration for -_executor-cores_ and- _total-executor-cores_ > (e.g. 4 & 8 or 1 & 8), it constantly fails at the same code segment. > Following are logs from our run: > > {code:java} > 18/10/18 23:06:18 INFO DAGScheduler: Submitting 1 missing tasks from > ResultStage 27 (PythonRDD[94] at RDD at PythonRDD.scala:49) (first 15 tasks > are for partitions Vector(0)) > 18/10/18 23:06:18 INFO TaskSchedulerImpl: Adding task set 27.0 with 1 tasks > 18/10/18 23:06:18 INFO TaskSetManager: Starting task 0.0 in stage 27.0 (TID > 112, 10.248.7.2, executor 0, partition 0, PROCESS_LOCAL, 5585 bytes) > 18/10/18 23:06:18 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory > on 10.248.7.2:44871 (size: 9.1 KB, free: 13.0 GB) > 18/10/18 23:06:24 WARN TaskSetManager: Lost task 0.0 in stage 27.0 (TID 112, > 10.248.7.2, executor 0): java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) > at > org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361) > at > org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/10/18 23:06:24 INFO TaskSetManager: Starting task 0.1 in stage 27.0 (TID > 113, 10.248.7.2, executor 0, partition 0, PROCESS_LOCAL, 5585 bytes) > 18/10/18 23:06:31 INFO TaskSetManager: Lost task 0.1 in stage 27.0 (TID 113) > on 10.248.7.2, executor 0: java.lang.AssertionError (assertion failed) > [duplicate 1] > 18/10/18 23:06:31 INFO TaskSetManager: Starting task 0.2 in stage 27.0 (TID > 114, 10.248.7.2, executor 0, partition 0, PROCESS_LOCAL, 5585 bytes) > 18/10/18 23:06:31 ERROR TaskSchedulerImpl: Lost executor 0 on 10.248.7.2: > Remote RPC client disassociated. Likely due to containers exceeding > thresholds, or network issues. Check driver logs for WARN messages. > 18/10/18 23:06:31 INFO StandaloneAppClient$ClientEndpoint: Executor updated: > app-20181018230546-0040/0 is now EXITED (Command exited with code 50) > 18/10/18 23:06:31 INFO StandaloneSchedulerBackend: Executor > app-20181018230546-0040/0 removed: Command exited with code 50 > 18/10/18 23:06:31 INFO StandaloneAppClient$ClientEndpoint: Executor added: > app-20181018230546-0040/2 on worker-20181018173742-10.248.110.2-40787 > (10.248.110.2:40787) with 4 cores > 18/10/18 23:06:31 INFO StandaloneSchedulerBackend: Granted executor ID > app-20181018230546-0040/2 on hostPort 10.248.110.2:40787 with 4 cores, 25.0 > GB RAM > 18/10/18 23:06:31 WARN TaskSetManager: Lost task 0.2 in stage 27.0 (TID 114, > 10.248.7.2, executor 0): ExecutorLostFailure (executor 0 exited caused by one > of the running tasks) Reason: Remote RPC client disassociated. Likely due to > containers exceeding thresholds, or network issues. Check driver logs for > WARN messages. > 18/10/18 23:06:31 INFO StandaloneAppClient$ClientEndpoint: Executor updated: > app-20181018230546-0040/2 is now RUNNING > 18/10/18 23:06:31 INFO DAGScheduler: Executor lost: 0 (epoch 11) > 18/10/18 23:06:31 INFO BlockManagerMaster: Removal of executor 0 requested > 18/10/18 23:06:31 INFO BlockManagerMasterEndpoint: Trying to remove executor > 0 from BlockManagerMaster. > 18/10/18 23:06:31 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to > remove non-existent executor 0 > 18/10/18 23:06:31 INFO TaskSetManager: Starting task 0.3 in stage 27.0 (TID > 115, 10.248.21.2, executor 1, partition 0, ANY, 5585 bytes) > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_27_2 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_44_7 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_27_4 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_44_3 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_27_0 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_44_5 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_78_0 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_10_2 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_10_0 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_10_4 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_78_6 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_78_4 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_78_2 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_61_4 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_61_6 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_44_1 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_61_0 ! > 18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available > for rdd_61_2 ! > 18/10/18 23:06:31 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(0, 10.248.7.2, 44871, None) > 18/10/18 23:06:31 INFO BlockManagerMasterEndpoint: Trying to remove executor > 0 from BlockManagerMaster. > 18/10/18 23:06:31 INFO BlockManagerMaster: Removed 0 successfully in > removeExecutor > 18/10/18 23:06:31 INFO DAGScheduler: Shuffle files lost for executor: 0 > (epoch 11) > 18/10/18 23:06:31 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory > on 10.248.21.2:43127 (size: 9.1 KB, free: 13.0 GB) > 18/10/18 23:06:32 INFO BlockManagerInfo: Added rdd_10_0 in memory on > 10.248.21.2:43127 (size: 2.2 MB, free: 13.0 GB) > 18/10/18 23:06:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: > Registered executor NettyRpcEndpointRef(spark-client://Executor) > (10.248.110.2:40562) with ID 2 > 18/10/18 23:06:33 INFO BlockManagerMasterEndpoint: Registering block manager > 10.248.110.2:32835 with 13.2 GB RAM, BlockManagerId(2, 10.248.110.2, 32835, > None) > 18/10/18 23:06:38 WARN TaskSetManager: Lost task 0.3 in stage 27.0 (TID 115, > 10.248.21.2, executor 1): java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) > at > org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361) > at > org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/10/18 23:06:38 ERROR TaskSetManager: Task 0 in stage 27.0 failed 4 times; > aborting job > 18/10/18 23:06:38 INFO TaskSchedulerImpl: Removed TaskSet 27.0, whose tasks > have all completed, from pool > 18/10/18 23:06:38 INFO TaskSchedulerImpl: Cancelling stage 27 > 18/10/18 23:06:38 INFO DAGScheduler: ResultStage 27 (runJob at > PythonRDD.scala:463) failed in 20.437 s due to Job aborted due to stage > failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task > 0.3 in stage 27.0 (TID 115, 10.248.21.2, executor 1): > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) > at > org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361) > at > org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Driver stacktrace: > 18/10/18 23:06:38 INFO DAGScheduler: Job 16 failed: runJob at > PythonRDD.scala:463, took 20.450274 s > Traceback (most recent call last): > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py", > line 582, in createDataFrame > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py", > line 380, in _createFromRDD > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py", > line 351, in _inferSchema > File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", > line 1368, in first > File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", > line 1350, in take > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py", > line 992, in runJob > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", > line 63, in deco > File > "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.runJob. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 27.0 (TID 115, 10.248.21.2, executor 1): java.lang.AssertionError: assertion > failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) > at > org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361) > at > org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1533) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1521) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1520) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1520) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1748) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1703) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1692) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) > at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:463) > at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:156) > at > org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84) > at > org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366) > at > org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361) > at > org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > 18/10/18 23:06:38 INFO SparkContext: Invoking stop() from shutdown hook > 18/10/18 23:06:38 INFO AbstractConnector: Stopped > Spark@47d1385c{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} > 18/10/18 23:06:38 INFO SparkUI: Stopped Spark web UI at > http://10.248.67.5:4040 > 18/10/18 23:06:38 INFO StandaloneSchedulerBackend: Shutting down all executors > 18/10/18 23:06:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking > each executor to shut down > 18/10/18 23:06:38 INFO MapOutputTrackerMasterEndpoint: > MapOutputTrackerMasterEndpoint stopped! > 18/10/18 23:06:38 INFO MemoryStore: MemoryStore cleared > 18/10/18 23:06:38 INFO BlockManager: BlockManager stopped > 18/10/18 23:06:38 INFO BlockManagerMaster: BlockManagerMaster stopped > 18/10/18 23:06:38 INFO > OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: > OutputCommitCoordinator stopped! > 18/10/18 23:06:38 INFO SparkContext: Successfully stopped SparkContext > 18/10/18 23:06:38 INFO ShutdownHookManager: Shutdown hook called > 18/10/18 23:06:38 INFO ShutdownHookManager: Deleting directory > /tmp/spark-f22d0534-184d-4602-bec0-825e9c15ed2d/pyspark-be23722b-1f19-4608-b47a-31579b8218e3 > 18/10/18 23:06:38 INFO ShutdownHookManager: Deleting directory > /tmp/spark-f22d0534-184d-4602-bec0-825e9c15ed2d > {code} > > > However, if we add a redundant > {code:java} > rdd.count() or rdd.cache(){code} > statement right before the failing createDataFrame() statement, it will fix > the problem. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org