This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 431e90de327 [SPARK-39200][CORE] Make Fallback Storage readFully on
content
431e90de327 is described below
commit 431e90de3271a3e6f4a3bf8746556de9784efea5
Author: Frank Yin <[email protected]>
AuthorDate: Fri Sep 23 04:23:26 2022 -0700
[SPARK-39200][CORE] Make Fallback Storage readFully on content
### What changes were proposed in this pull request?
Looks like from bug description, fallback storage doesn't readFully and
then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error:
Corrupted block detected`. This is an attempt to fix this by read the
underlying stream fully.
### Why are the changes needed?
Fix a bug documented in SPARK-39200
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Wrote a unit test
Closes #37960 from ukby1234/SPARK-39200.
Authored-by: Frank Yin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 07061f1a07a96f59ae42c9df6110eb784d2f3dab)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/storage/FallbackStorage.scala | 2 +-
.../spark/storage/FallbackStorageSuite.scala | 90 +++++++++++++++++++++-
2 files changed, 90 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index e644ffe87e3..5aa5c6eff7b 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -193,7 +193,7 @@ private[spark] object FallbackStorage extends Logging {
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
f.seek(offset)
- f.read(array)
+ f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 *
1000)}ms")
f.close()
new NioManagedBuffer(ByteBuffer.wrap(array))
diff --git
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 3828e9d8297..83c9707bfc2 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -16,12 +16,14 @@
*/
package org.apache.spark.storage
-import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.io.{DataOutputStream, File, FileOutputStream, InputStream,
IOException}
import java.nio.file.Files
import scala.concurrent.duration._
+import scala.util.Random
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path,
PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@@ -107,6 +109,49 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}
+ test("SPARK-39200: fallback storage APIs - readFully") {
+ val conf = new SparkConf(false)
+ .set("spark.app.id", "testId")
+ .set("spark.hadoop.fs.file.impl", classOf[ReadPartialFileSystem].getName)
+ .set(SHUFFLE_COMPRESS, false)
+ .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+ "file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath +
"/")
+ val fallbackStorage = new FallbackStorage(conf)
+ val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf,
false)
+
+ val bm = mock(classOf[BlockManager])
+ val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver =
false)
+ when(bm.diskBlockManager).thenReturn(dbm)
+ when(bm.master).thenReturn(bmm)
+ val resolver = new IndexShuffleBlockResolver(conf, bm)
+ when(bm.migratableResolver).thenReturn(resolver)
+
+ val length = 100000
+ val content = new Array[Byte](length)
+ Random.nextBytes(content)
+
+ val indexFile = resolver.getIndexFile(1, 2L)
+ tryWithResource(new FileOutputStream(indexFile)) { fos =>
+ val dos = new DataOutputStream(fos)
+ dos.writeLong(0)
+ dos.writeLong(length)
+ }
+
+ val dataFile = resolver.getDataFile(1, 2L)
+ tryWithResource(new FileOutputStream(dataFile)) { fos =>
+ fos.write(content)
+ }
+
+ fallbackStorage.copy(ShuffleBlockInfo(1, 2L), bm)
+
+ assert(fallbackStorage.exists(1, ShuffleIndexBlockId(1, 2L,
NOOP_REDUCE_ID).name))
+ assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L,
NOOP_REDUCE_ID).name))
+
+ val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
+ assert(readResult.nioByteBuffer().array().sameElements(content))
+ }
+
test("SPARK-34142: fallback storage API - cleanUp") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
@@ -289,3 +334,46 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
}
}
}
+class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
+ with Seekable with PositionedReadable {
+ override def read: Int = in.read
+
+ override def read(b: Array[Byte], off: Int, len: Int): Int = {
+ if (len > 1) {
+ in.read(b, off, len - 1)
+ } else {
+ in.read(b, off, len)
+ }
+ }
+
+ override def seek(pos: Long): Unit = {
+ in.seek(pos)
+ }
+
+ override def getPos: Long = in.getPos
+
+ override def seekToNewSource(targetPos: Long): Boolean =
in.seekToNewSource(targetPos)
+
+ override def read(position: Long, buffer: Array[Byte], offset: Int, length:
Int): Int = {
+ if (length > 1) {
+ in.read(position, buffer, offset, length - 1)
+ } else {
+ in.read(position, buffer, offset, length)
+ }
+ }
+
+ override def readFully(position: Long, buffer: Array[Byte], offset: Int,
length: Int): Unit = {
+ in.readFully(position, buffer, offset, length)
+ }
+
+ override def readFully(position: Long, buffer: Array[Byte]): Unit = {
+ in.readFully(position, buffer)
+ }
+}
+
+class ReadPartialFileSystem extends LocalFileSystem {
+ override def open(f: Path): FSDataInputStream = {
+ val stream = super.open(f)
+ new FSDataInputStream(new ReadPartialInputStream(stream))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]