Wan Kun created SPARK-47518:
-------------------------------

             Summary: Skip transfer the last spilled shuffle data
                 Key: SPARK-47518
                 URL: https://issues.apache.org/jira/browse/SPARK-47518
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 4.0.0
            Reporter: Wan Kun


If there is only one spill data file, spark will transfer that spill file to 
the final data file.
{code:java}
@Override
  public void transferMapSpillFile(
      File mapSpillFile,
      long[] partitionLengths,
      long[] checksums) throws IOException {
    // The map spill file already has the proper format, and it contains all of 
the partition data.
    // So just transfer it directly to the destination without any merging.
    File outputFile = blockResolver.getDataFile(shuffleId, mapId);
    File tempFile = Utils.tempFileWith(outputFile);
    Files.move(mapSpillFile.toPath(), tempFile.toPath());
    blockResolver
      .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, 
checksums, tempFile);
  }
{code}
But if that spill file and the final data file are on different disks,  there 
will still be a heavy data transfer.
{code:java}
sun.nio.fs.UnixCopyFile.transfer(Native Method)
sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:251)
sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:471)
sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
java.nio.file.Files.move(Files.java:1395)
org.apache.spark.shuffle.sort.io.LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile(LocalDiskSingleSpillMapOutputWriter.java:52)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:280)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:131)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$453/980524593.apply(Unknown
 Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
{code}

We can optimize this step by writing the final spill file to the disk which the 
final data file will be used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to