Wan Kun created SPARK-47518: ------------------------------- Summary: Skip transfer the last spilled shuffle data Key: SPARK-47518 URL: https://issues.apache.org/jira/browse/SPARK-47518 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Wan Kun
If there is only one spill data file, spark will transfer that spill file to the final data file. {code:java} @Override public void transferMapSpillFile( File mapSpillFile, long[] partitionLengths, long[] checksums) throws IOException { // The map spill file already has the proper format, and it contains all of the partition data. // So just transfer it directly to the destination without any merging. File outputFile = blockResolver.getDataFile(shuffleId, mapId); File tempFile = Utils.tempFileWith(outputFile); Files.move(mapSpillFile.toPath(), tempFile.toPath()); blockResolver .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile); } {code} But if that spill file and the final data file are on different disks, there will still be a heavy data transfer. {code:java} sun.nio.fs.UnixCopyFile.transfer(Native Method) sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:251) sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:471) sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) java.nio.file.Files.move(Files.java:1395) org.apache.spark.shuffle.sort.io.LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile(LocalDiskSingleSpillMapOutputWriter.java:52) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:280) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180) org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) org.apache.spark.scheduler.Task.run(Task.scala:131) org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) org.apache.spark.executor.Executor$TaskRunner$$Lambda$453/980524593.apply(Unknown Source) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) {code} We can optimize this step by writing the final spill file to the disk which the final data file will be used. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org