This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d1cd110 [SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks
from push-based shuffle
d1cd110 is described below
commit d1cd110c20817eb1ccd716e099be5712df1f670c
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Dec 23 12:00:45 2021 +0800
[SPARK-37695][CORE][SHUFFLE] Skip diagnosis ob merged blocks from
push-based shuffle
### What changes were proposed in this pull request?
Skip diagnosis ob merged blocks from push-based shuffle
### Why are the changes needed?
Because SPARK-36284 has not been addressed yet, skip it to suppress
exceptions.
```
21/12/19 18:46:37 WARN TaskSetManager: Lost task 166.0 in stage 1921.0 (TID
138855) (beta-spark5 executor 218): java.lang.AssertionError: assertion failed:
Expected ShuffleBlockId, but got shuffleChunk_464_0_5645_0
at scala.Predef$.assert(Predef.scala:223)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1043)
at
org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1308)
at scala.Option.map(Option.scala:230)
at
org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1307)
at
org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1293)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.sort_addToSorter_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.smj_findNextJoinRows_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:778)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
Yes, suppress the exceptions.
### How was this patch tested?
Run 1T TPCDS manually.
Closes #34961 from pan3793/SPARK-37695.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: yi.wu <[email protected]>
(cherry picked from commit 57ca75f3f0b2695434e47464b2201210edd58fde)
Signed-off-by: yi.wu <[email protected]>
---
.../storage/ShuffleBlockFetcherIterator.scala | 69 ++++++++++++----------
1 file changed, 39 insertions(+), 30 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 3eb8acd..1f96579 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1035,40 +1035,49 @@ final class ShuffleBlockFetcherIterator(
address: BlockManagerId,
blockId: BlockId): String = {
logInfo("Start corruption diagnosis.")
- val startTimeNs = System.nanoTime()
- assert(blockId.isInstanceOf[ShuffleBlockId], s"Expected ShuffleBlockId,
but got $blockId")
- val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
- val buffer = new
Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
- // consume the remaining data to calculate the checksum
- var cause: Cause = null
- try {
- while (checkedIn.read(buffer) != -1) {}
- val checksum = checkedIn.getChecksum.getValue
- cause = shuffleClient.diagnoseCorruption(address.host, address.port,
address.executorId,
- shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId,
checksum,
- checksumAlgorithm)
- } catch {
- case e: Exception =>
- logWarning("Unable to diagnose the corruption cause of the corrupted
block", e)
- cause = Cause.UNKNOWN_ISSUE
- }
- val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs)
- val diagnosisResponse = cause match {
- case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM =>
- s"Block $blockId is corrupted but corruption diagnosis failed due to "
+
- s"unsupported checksum algorithm: $checksumAlgorithm"
+ blockId match {
+ case shuffleBlock: ShuffleBlockId =>
+ val startTimeNs = System.nanoTime()
+ val buffer = new
Array[Byte](ShuffleChecksumHelper.CHECKSUM_CALCULATION_BUFFER)
+ // consume the remaining data to calculate the checksum
+ var cause: Cause = null
+ try {
+ while (checkedIn.read(buffer) != -1) {}
+ val checksum = checkedIn.getChecksum.getValue
+ cause = shuffleClient.diagnoseCorruption(address.host, address.port,
address.executorId,
+ shuffleBlock.shuffleId, shuffleBlock.mapId, shuffleBlock.reduceId,
checksum,
+ checksumAlgorithm)
+ } catch {
+ case e: Exception =>
+ logWarning("Unable to diagnose the corruption cause of the
corrupted block", e)
+ cause = Cause.UNKNOWN_ISSUE
+ }
+ val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startTimeNs)
+ val diagnosisResponse = cause match {
+ case Cause.UNSUPPORTED_CHECKSUM_ALGORITHM =>
+ s"Block $blockId is corrupted but corruption diagnosis failed due
to " +
+ s"unsupported checksum algorithm: $checksumAlgorithm"
- case Cause.CHECKSUM_VERIFY_PASS =>
- s"Block $blockId is corrupted but checksum verification passed"
+ case Cause.CHECKSUM_VERIFY_PASS =>
+ s"Block $blockId is corrupted but checksum verification passed"
- case Cause.UNKNOWN_ISSUE =>
- s"Block $blockId is corrupted but the cause is unknown"
+ case Cause.UNKNOWN_ISSUE =>
+ s"Block $blockId is corrupted but the cause is unknown"
- case otherCause =>
- s"Block $blockId is corrupted due to $otherCause"
+ case otherCause =>
+ s"Block $blockId is corrupted due to $otherCause"
+ }
+ logInfo(s"Finished corruption diagnosis in $duration ms.
$diagnosisResponse")
+ diagnosisResponse
+ case shuffleBlockChunk: ShuffleBlockChunkId =>
+ // TODO SPARK-36284 Add shuffle checksum support for push-based shuffle
+ val diagnosisResponse = s"BlockChunk $shuffleBlockChunk is corrupted
but corruption " +
+ s"diagnosis is skipped due to lack of shuffle checksum support for
push-based shuffle."
+ logWarning(diagnosisResponse)
+ diagnosisResponse
+ case unexpected: BlockId =>
+ throw new IllegalArgumentException(s"Unexpected type of BlockId,
$unexpected")
}
- logInfo(s"Finished corruption diagnosis in $duration ms.
$diagnosisResponse")
- diagnosisResponse
}
def toCompletionIterator: Iterator[(BlockId, InputStream)] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]