Repository: spark Updated Branches: refs/heads/master bf8ff833e -> c1937dd19
[SPARK-16862] Configurable buffer size in `UnsafeSorterSpillReader` ## What changes were proposed in this pull request? Jira: https://issues.apache.org/jira/browse/SPARK-16862 `BufferedInputStream` used in `UnsafeSorterSpillReader` uses the default 8k buffer to read data off disk. This PR makes it configurable to improve on disk reads. I have made the default value to be 1 MB as with that value I observed improved performance. ## How was this patch tested? I am relying on the existing unit tests. ## Performance After deploying this change to prod and setting the config to 1 mb, there was a 12% reduction in the CPU time and 19.5% reduction in CPU reservation time. Author: Tejas Patil <[email protected]> Closes #14726 from tejasapatil/spill_buffer_2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1937dd1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1937dd1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1937dd1 Branch: refs/heads/master Commit: c1937dd19a23bd096a4707656c7ba19fb5c16966 Parents: bf8ff83 Author: Tejas Patil <[email protected]> Authored: Tue Aug 23 18:48:08 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Tue Aug 23 18:48:08 2016 -0700 ---------------------------------------------------------------------- .../unsafe/sort/UnsafeSorterSpillReader.java | 22 +++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c1937dd1/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 1d588c3..d048cf7 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -22,15 +22,21 @@ import java.io.*; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; +import org.apache.spark.SparkEnv; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; import org.apache.spark.unsafe.Platform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description * of the file format). */ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); + private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB + private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb private InputStream in; private DataInputStream din; @@ -50,7 +56,21 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen File file, BlockId blockId) throws IOException { assert (file.length() > 0); - final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file)); + long bufferSizeBytes = + SparkEnv.get() == null ? + DEFAULT_BUFFER_SIZE_BYTES: + SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size", + DEFAULT_BUFFER_SIZE_BYTES); + if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) { + // fall back to a sane default value + logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " + + "allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes, + DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES); + bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; + } + + final BufferedInputStream bs = + new BufferedInputStream(new FileInputStream(file), (int) bufferSizeBytes); try { this.in = serializerManager.wrapForCompression(blockId, bs); this.din = new DataInputStream(this.in); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
