This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b87ec5d  [SPARK-32658][CORE] Fix `PartitionWriterStream` partition 
length overflow
b87ec5d is described below

commit b87ec5dcecaf4ad0397832ecedc249183671d367
Author: Xingbo Jiang <[email protected]>
AuthorDate: Thu Aug 20 07:08:30 2020 +0000

    [SPARK-32658][CORE] Fix `PartitionWriterStream` partition length overflow
    
    ### What changes were proposed in this pull request?
    
    The `count` in `PartitionWriterStream` should be a long value, instead of 
int. The issue is introduced by apache/sparkabef84a . When the overflow 
happens, the shuffle index file would record wrong index of a reduceId, thus 
lead to `FetchFailedException: Stream is corrupted` error.
    
    Besides the fix, I also added some debug logs, so in the future it's easier 
to debug similar issues.
    
    ### Why are the changes needed?
    
    This is a regression and bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    A Spark user reported this issue when migrating their workload to 3.0. One 
of the jobs fail deterministically on Spark 3.0 without the patch, and the job 
succeed after applied the fix.
    
    Closes #29474 from jiangxb1987/fixPartitionWriteStream.
    
    Authored-by: Xingbo Jiang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit f793977e9ac2ef597fca4a95356affbfbf864f88)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/shuffle/sort/UnsafeShuffleWriter.java    | 15 +++++++++++++++
 .../shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java  |  6 ++++--
 .../apache/spark/shuffle/IndexShuffleBlockResolver.scala  |  1 +
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index d09282e..3eee1e4 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -18,6 +18,7 @@
 package org.apache.spark.shuffle.sort;
 
 import java.nio.channels.Channels;
+import java.util.Arrays;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import java.io.*;
@@ -274,6 +275,8 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
         // Here, we don't need to perform any metrics updates because the 
bytes written to this
         // output file would have already been counted as shuffle bytes 
written.
         partitionLengths = spills[0].partitionLengths;
+        logger.debug("Merge shuffle spills for mapId {} with length {}", mapId,
+            partitionLengths.length);
         maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, 
partitionLengths);
       } else {
         partitionLengths = mergeSpillsUsingStandardWriter(spills);
@@ -360,6 +363,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       SpillInfo[] spills,
       ShuffleMapOutputWriter mapWriter,
       @Nullable CompressionCodec compressionCodec) throws IOException {
+    logger.debug("Merge shuffle spills with FileStream for mapId {}", mapId);
     final int numPartitions = partitioner.numPartitions();
     final InputStream[] spillInputStreams = new InputStream[spills.length];
 
@@ -369,6 +373,11 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
         spillInputStreams[i] = new NioBufferedFileInputStream(
           spills[i].file,
           inputBufferSizeInBytes);
+        // Only convert the partitionLengths when debug level is enabled.
+        if (logger.isDebugEnabled()) {
+          logger.debug("Partition lengths for mapId {} in Spill {}: {}", 
mapId, i,
+              Arrays.toString(spills[i].partitionLengths));
+        }
       }
       for (int partition = 0; partition < numPartitions; partition++) {
         boolean copyThrewException = true;
@@ -431,6 +440,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
   private void mergeSpillsWithTransferTo(
       SpillInfo[] spills,
       ShuffleMapOutputWriter mapWriter) throws IOException {
+    logger.debug("Merge shuffle spills with TransferTo for mapId {}", mapId);
     final int numPartitions = partitioner.numPartitions();
     final FileChannel[] spillInputChannels = new FileChannel[spills.length];
     final long[] spillInputChannelPositions = new long[spills.length];
@@ -439,6 +449,11 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     try {
       for (int i = 0; i < spills.length; i++) {
         spillInputChannels[i] = new 
FileInputStream(spills[i].file).getChannel();
+        // Only convert the partitionLengths when debug level is enabled.
+        if (logger.isDebugEnabled()) {
+          logger.debug("Partition lengths for mapId {} in Spill {}: {}", 
mapId, i,
+              Arrays.toString(spills[i].partitionLengths));
+        }
       }
       for (int partition = 0; partition < numPartitions; partition++) {
         boolean copyThrewException = true;
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index a6529fd..1c3eb34 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -112,6 +112,8 @@ public class LocalDiskShuffleMapOutputWriter implements 
ShuffleMapOutputWriter {
     }
     cleanUp();
     File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? 
outputTempFile : null;
+    log.debug("Writing shuffle index file for mapId {} with length {}", mapId,
+        partitionLengths.length);
     blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, 
resolvedTmp);
     return partitionLengths;
   }
@@ -210,14 +212,14 @@ public class LocalDiskShuffleMapOutputWriter implements 
ShuffleMapOutputWriter {
 
   private class PartitionWriterStream extends OutputStream {
     private final int partitionId;
-    private int count = 0;
+    private long count = 0;
     private boolean isClosed = false;
 
     PartitionWriterStream(int partitionId) {
       this.partitionId = partitionId;
     }
 
-    public int getCount() {
+    public long getCount() {
       return count;
     }
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index af2c82e..b7caf4f 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -209,6 +209,7 @@ private[spark] class IndexShuffleBlockResolver(
         }
       }
     } finally {
+      logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", 
"]")}")
       if (indexTmp.exists() && !indexTmp.delete()) {
         logError(s"Failed to delete temporary index file at 
${indexTmp.getAbsolutePath}")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to