Repository: spark Updated Branches: refs/heads/branch-2.4 9c0c6d4d5 -> 1001d2314
[SPARK-25704][CORE] Allocate a bit less than Int.MaxValue JVMs don't you allocate arrays of length exactly Int.MaxValue, so leave a little extra room. This is necessary when reading blocks >2GB off the network (for remote reads or for cache replication). Unit tests via jenkins, ran a test with blocks over 2gb on a cluster Closes #22705 from squito/SPARK-25704. Authored-by: Imran Rashid <iras...@cloudera.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1001d231 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1001d231 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1001d231 Branch: refs/heads/branch-2.4 Commit: 1001d2314275c902da519725da266a23b537e33a Parents: 9c0c6d4 Author: Imran Rashid <iras...@cloudera.com> Authored: Fri Oct 19 12:52:41 2018 -0500 Committer: Imran Rashid <iras...@cloudera.com> Committed: Fri Oct 19 12:54:08 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/storage/BlockManager.scala | 6 ++---- .../apache/spark/util/io/ChunkedByteBuffer.scala | 16 +++++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1001d231/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0fe82ac..c01a453 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -133,8 +133,6 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) - private val chunkSize = - conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt private val remoteReadNioBufferConversion = conf.getBoolean("spark.network.remoteReadNioBufferConversion", false) @@ -451,7 +449,7 @@ private[spark] class BlockManager( new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) case None => - ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) + ChunkedByteBuffer.fromFile(tmpFile) } putBytes(blockId, buffer, level)(classTag) tmpFile.delete() @@ -797,7 +795,7 @@ private[spark] class BlockManager( if (remoteReadNioBufferConversion) { return Some(new ChunkedByteBuffer(data.nioByteBuffer())) } else { - return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) + return Some(ChunkedByteBuffer.fromManagedBuffer(data)) } } logDebug(s"The value of block $blockId is null") http://git-wip-us.apache.org/repos/asf/spark/blob/1001d231/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 4aa8d45..9547cb4 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.config import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream} import org.apache.spark.storage.StorageUtils +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils /** @@ -169,24 +170,25 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } -object ChunkedByteBuffer { +private[spark] object ChunkedByteBuffer { + + // TODO eliminate this method if we switch BlockManager to getting InputStreams - def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { + def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = { data match { case f: FileSegmentManagedBuffer => - fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength) + fromFile(f.getFile, f.getOffset, f.getLength) case other => new ChunkedByteBuffer(other.nioByteBuffer()) } } - def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = { - fromFile(file, maxChunkSize, 0, file.length()) + def fromFile(file: File): ChunkedByteBuffer = { + fromFile(file, 0, file.length()) } private def fromFile( file: File, - maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { // We do *not* memory map the file, because we may end up putting this into the memory store, @@ -195,7 +197,7 @@ object ChunkedByteBuffer { val is = new FileInputStream(file) ByteStreams.skipFully(is, offset) val in = new LimitedInputStream(is, length) - val chunkSize = math.min(maxChunkSize, length).toInt + val chunkSize = math.min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, length).toInt val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate _) Utils.tryWithSafeFinally { IOUtils.copy(in, out) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org