This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 282c7ae7b5a [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors 282c7ae7b5a is described below commit 282c7ae7b5adbd88466681bc986a7d914080f08a Author: attilapiros <piros.attila.zs...@gmail.com> AuthorDate: Tue Jun 21 12:06:56 2022 +0800 [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors ### What changes were proposed in this pull request? Deregistering disk persisted local blocks from the block manager in case of IO related errors. ### Why are the changes needed? In case of a disk corruption a disk persisted block will lead to job failure as the block registration is always leads to the same file. So even when the task is rescheduled on a different executor the job will fail. Example: First failure (the block is locally available): ``` 22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in stage 12.0 (TID 51853) java.io.StreamCorruptedException: invalid stream header: 00000000 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ``` Then the task might be rescheduled on a different executor but as the block is registered to the first block manager the error will be the same: ``` java.io.StreamCorruptedException: invalid stream header: 00000000 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698) at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) ``` My idea deregistering the block when the IO operation occurs and let the following task to recompute it. This PR only targets only local blocks. In a follow up PR `getRemoteValues` can be extended with the block removing. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1) An existing unit test was extended. 2) Manually. #### Manual testing Start Spark: ``` $ ./bin/spark-shell --master "local-cluster[3,1,1200]" --conf spark.serializer=org.apache.spark.serializer.JavaSerializer ``` Create a persisted RDD (here via a DF): ``` scala> val df = sc.parallelize(1 to 20, 4).toDF ... scala> df.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) ... scala> df.show() +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| | 20| +-----+ ``` Now as the blocks are persisted let's corrupt one of the file. For this we have to find the the directory where the blocks stored: ``` $ grep "DiskBlockManager: Created local directory" work/app-20220511112820-0000/*/stdout work/app-20220511112820-0000/0/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-0f4c0d32-8f12-447f-add3-5cfbd4a7c777/blockmgr-dde20b67-a824-4d92-9023-8fa902588a26 work/app-20220511112820-0000/1/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-05de3de7-60ca-4954-8baa-965da3c35ce5/blockmgr-71c559a6-f0e8-42a1-bf53-3bddb4a69618 work/app-20220511112820-0000/2/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c ``` Let's write something into one of the rdd file: ``` vim /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1 ``` Use the DF/RDD one more time: ``` scala> df.show() 22/05/11 11:30:41 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 7) (192.168.1.65 executor 2): java.io.StreamCorruptedException: invalid stream header: 41ACED00 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66) at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:66) at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:137) at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:212) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:967) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1277) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1344) ... at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| | 20| +-----+ ``` Check the logs: ``` $ cat work/app-20220511112820-0000/2/stdout ... 22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 7 22/05/11 11:30:41 INFO Executor: Running task 0.0 in stage 3.0 (TID 7) ... 22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00. BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 0 - blockDiskPath: /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1 22/05/11 11:30:41 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7) java.io.StreamCorruptedException: invalid stream header: 41ACED00 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938) ~[?:1.8.0_322] at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396) ~[?:1.8.0_322] at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66) ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT] ... at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_322] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_322] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_322] 22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 8 22/05/11 11:30:41 INFO Executor: Running task 0.1 in stage 3.0 (TID 8) 22/05/11 11:30:41 INFO Executor: Finished task 0.1 in stage 3.0 (TID 8). 1623 bytes result sent to driver ``` Closes #36512 from attilapiros/handleCorruptedCachedRdd. Authored-by: attilapiros <piros.attila.zs...@gmail.com> Signed-off-by: yi.wu <yi...@databricks.com> --- .../org/apache/spark/storage/BlockManager.scala | 64 +++++++++++++--------- .../apache/spark/storage/BlockManagerSuite.scala | 2 + 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 619b5e1edf7..f4adbc7ccb3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -345,9 +345,7 @@ private[spark] class BlockManager( } } catch { case ex: KryoException if ex.getCause.isInstanceOf[IOException] => - // We need to have detailed log message to catch environmental problems easily. - // Further details: https://issues.apache.org/jira/browse/SPARK-37710 - processKryoException(ex, blockId) + logInfo(extendMessageWithBlockDetails(ex.getMessage, blockId)) throw ex } finally { IOUtils.closeQuietly(inputStream) @@ -905,6 +903,10 @@ private[spark] class BlockManager( throw SparkCoreErrors.readLockedBlockNotFoundError(blockId) } + private def isIORelatedException(t: Throwable): Boolean = + t.isInstanceOf[IOException] || + (t.isInstanceOf[KryoException] && t.getCause.isInstanceOf[IOException]) + /** * Get block from local block manager as an iterator of Java objects. */ @@ -933,31 +935,37 @@ private[spark] class BlockManager( }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { + var diskData: BlockData = null try { - val diskData = diskStore.getBytes(blockId) - val iterToReturn: Iterator[Any] = { - if (level.deserialized) { - val diskValues = serializerManager.dataDeserializeStream( - blockId, - diskData.toInputStream())(info.classTag) - maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) - } else { - val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) - .map { _.toInputStream(dispose = false) } - .getOrElse { diskData.toInputStream() } - serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) - } + diskData = diskStore.getBytes(blockId) + val iterToReturn = if (level.deserialized) { + val diskValues = serializerManager.dataDeserializeStream( + blockId, + diskData.toInputStream())(info.classTag) + maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) + } else { + val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) + .map { _.toInputStream(dispose = false) } + .getOrElse { diskData.toInputStream() } + serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { releaseLockAndDispose(blockId, diskData, taskContext) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } catch { - case ex: KryoException if ex.getCause.isInstanceOf[IOException] => - // We need to have detailed log message to catch environmental problems easily. - // Further details: https://issues.apache.org/jira/browse/SPARK-37710 - processKryoException(ex, blockId) - throw ex + case t: Throwable => + if (diskData != null) { + diskData.dispose() + diskData = null + } + releaseLock(blockId, taskContext) + if (isIORelatedException(t)) { + logInfo(extendMessageWithBlockDetails(t.getMessage, blockId)) + // Remove the block so that its unavailability is reported to the driver + removeBlock(blockId) + } + throw t } } else { handleLocalReadFailure(blockId) @@ -965,14 +973,18 @@ private[spark] class BlockManager( } } - private def processKryoException(ex: KryoException, blockId: BlockId): Unit = { - var message = - "%s. %s - blockId: %s".format(ex.getMessage, blockManagerId.toString, blockId) + /** + * We need to have detailed log message to catch environmental problems easily. + * Further details: https://issues.apache.org/jira/browse/SPARK-37710 + */ + private def extendMessageWithBlockDetails(msg: String, blockId: BlockId): String = { + val message: String = "%s. %s - blockId: %s".format(msg, blockManagerId.toString, blockId) val file = diskBlockManager.getFile(blockId) if (file.exists()) { - message = "%s - blockDiskPath: %s".format(message, file.getAbsolutePath) + "%s - blockDiskPath: %s".format(message, file.getAbsolutePath) + } else { + message } - logInfo(message) } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 45e05b2cc2d..545bbee6671 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -2149,6 +2149,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE () => List(new Array[User](1)).iterator) } assert(kryoException.getMessage === "java.io.IOException: Input/output error") + assertUpdateBlockInfoReportedForRemovingBlock(store, "my-block-id", + removedFromMemory = false, removedFromDisk = true) } test("check KryoException when saving blocks into memory and 'Input/output error' is occurred") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org