Repository: spark
Updated Branches:
refs/heads/master 300807c6e -> 16612638f
[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely
## What changes were proposed in this pull request?
In our production cluster,oom happens when NettyBlockRpcServer receive
OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use
Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default
maxNumComponents=16 in low-level CompositeByteBuf.When our component's number
is bigger than 16, it will execute consolidateIfNeeded
int numComponents = this.components.size();
if(numComponents > this.maxNumComponents) {
int capacity =
((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
ByteBuf consolidated = this.allocBuffer(capacity);
for(int c = 0; c < numComponents; ++c) {
CompositeByteBuf.Component c1 =
(CompositeByteBuf.Component)this.components.get(c);
ByteBuf b = c1.buf;
consolidated.writeBytes(b);
c1.freeIfNecessary();
}
CompositeByteBuf.Component var7 = new
CompositeByteBuf.Component(consolidated);
var7.endOffset = var7.length;
this.components.clear();
this.components.add(var7);
}
in CompositeByteBuf which will consume some memory during buffer copy.
We can use another api Unpooled. wrappedBuffer(int maxNumComponents,
ByteBuffer... buffers) to avoid this comsuming.
## How was this patch tested?
Test in production cluster.
Author: zhoukang <[email protected]>
Closes #18723 from caneGuy/zhoukang/fix-chunkbuffer.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16612638
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16612638
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16612638
Branch: refs/heads/master
Commit: 16612638f0539f197eb7deb1be2ec53fed60d707
Parents: 300807c
Author: zhoukang <[email protected]>
Authored: Tue Jul 25 17:59:21 2017 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Tue Jul 25 17:59:21 2017 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/16612638/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 2f905c8..f48bfd5 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -66,7 +66,7 @@ private[spark] class ChunkedByteBuffer(var chunks:
Array[ByteBuffer]) {
* Wrap this buffer to view it as a Netty ByteBuf.
*/
def toNetty: ByteBuf = {
- Unpooled.wrappedBuffer(getChunks(): _*)
+ Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]