This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5db58f92538 [SPARK-44993][CORE] Add
`ShuffleChecksumUtils.compareChecksums` by reusing
`ShuffleChecksumTestHelp.compareChecksums`
5db58f92538 is described below
commit 5db58f92538d2cf2fee90a5ca08c07c4e2242aad
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Aug 28 17:29:33 2023 -0700
[SPARK-44993][CORE] Add `ShuffleChecksumUtils.compareChecksums` by reusing
`ShuffleChecksumTestHelp.compareChecksums`
### What changes were proposed in this pull request?
This PR aims to add `ShuffleChecksumUtils.compareChecksums` by reusing the
existing test code `ShuffleChecksumTestHelp.compareChecksums` in order to reuse
the functionality in the main code.
### Why are the changes needed?
This is very useful in the test code. We can take advantage of this
verification logic in `core` module.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the existing test codes because this is a kind of
refactoring.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42707 from dongjoon-hyun/SPARK-44993.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/shuffle/ShuffleChecksumUtils.scala} | 13 +++---
.../spark/shuffle/ShuffleChecksumTestHelper.scala | 49 ++--------------------
2 files changed, 8 insertions(+), 54 deletions(-)
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
similarity index 87%
copy from
core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
copy to core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
index 3db2f77fe15..75b0efcf5cd 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
@@ -23,21 +23,17 @@ import java.util.zip.CheckedInputStream
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.network.util.LimitedInputStream
-trait ShuffleChecksumTestHelper {
+object ShuffleChecksumUtils {
/**
- * Ensure that the checksum values are consistent between write and read
side.
+ * Ensure that the checksum values are consistent with index file and data
file.
*/
def compareChecksums(
numPartition: Int,
algorithm: String,
checksum: File,
data: File,
- index: File): Unit = {
- assert(checksum.exists(), "Checksum file doesn't exist")
- assert(data.exists(), "Data file doesn't exist")
- assert(index.exists(), "Index file doesn't exist")
-
+ index: File): Boolean = {
var checksumIn: DataInputStream = null
val expectChecksums = Array.ofDim[Long](numPartition)
try {
@@ -66,7 +62,7 @@ trait ShuffleChecksumTestHelper {
checkedIn.read(bytes, 0, limit)
prevOffset = curOffset
// checksum must be consistent at both write and read sides
- assert(checkedIn.getChecksum.getValue == expectChecksums(i))
+ if (checkedIn.getChecksum.getValue != expectChecksums(i)) return false
}
} finally {
if (dataIn != null) {
@@ -79,5 +75,6 @@ trait ShuffleChecksumTestHelper {
checkedIn.close()
}
}
+ true
}
}
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
index 3db2f77fe15..8be103b7be8 100644
---
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
+++
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
@@ -17,11 +17,7 @@
package org.apache.spark.shuffle
-import java.io.{DataInputStream, File, FileInputStream}
-import java.util.zip.CheckedInputStream
-
-import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
-import org.apache.spark.network.util.LimitedInputStream
+import java.io.File
trait ShuffleChecksumTestHelper {
@@ -38,46 +34,7 @@ trait ShuffleChecksumTestHelper {
assert(data.exists(), "Data file doesn't exist")
assert(index.exists(), "Index file doesn't exist")
- var checksumIn: DataInputStream = null
- val expectChecksums = Array.ofDim[Long](numPartition)
- try {
- checksumIn = new DataInputStream(new FileInputStream(checksum))
- (0 until numPartition).foreach(i => expectChecksums(i) =
checksumIn.readLong())
- } finally {
- if (checksumIn != null) {
- checksumIn.close()
- }
- }
-
- var dataIn: FileInputStream = null
- var indexIn: DataInputStream = null
- var checkedIn: CheckedInputStream = null
- try {
- dataIn = new FileInputStream(data)
- indexIn = new DataInputStream(new FileInputStream(index))
- var prevOffset = indexIn.readLong
- (0 until numPartition).foreach { i =>
- val curOffset = indexIn.readLong
- val limit = (curOffset - prevOffset).toInt
- val bytes = new Array[Byte](limit)
- val checksumCal =
ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
- checkedIn = new CheckedInputStream(
- new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
- checkedIn.read(bytes, 0, limit)
- prevOffset = curOffset
- // checksum must be consistent at both write and read sides
- assert(checkedIn.getChecksum.getValue == expectChecksums(i))
- }
- } finally {
- if (dataIn != null) {
- dataIn.close()
- }
- if (indexIn != null) {
- indexIn.close()
- }
- if (checkedIn != null) {
- checkedIn.close()
- }
- }
+ assert(ShuffleChecksumUtils.compareChecksums(numPartition, algorithm,
checksum, data, index),
+ "checksum must be consistent at both write and read sides")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]