Repository: spark Updated Branches: refs/heads/master fa09d9192 -> 094aa5971
[SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in SaslEncryption$EncryptedMessage ## What changes were proposed in this pull request? Initialize SaslEncryption$EncryptedMessage.byteChannel lazily, so that empty, not yet used instances of ByteArrayWritableChannel referenced by this field don't use up memory. I analyzed a heap dump from Yarn Node Manager where this code is used, and found that there are over 40,000 of the above objects in memory, each with a big empty byte[] array. The reason they are all there is because of Netty queued up a large number of messages in memory before transferTo() is called. There is a small number of netty ChannelOutboundBuffer objects, and then collectively , via linked lists starting from their flushedEntry data fields, they end up referencing over 40K ChannelOutboundBuffer$Entry objects, which ultimately reference EncryptedMessage objects. ## How was this patch tested? Ran all the tests locally. Author: Misha Dmitriev <mi...@cloudera.com> Closes #21811 from countmdm/misha/spark-24801. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/094aa597 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/094aa597 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/094aa597 Branch: refs/heads/master Commit: 094aa597155dfcbf41a2490c9e462415e3824901 Parents: fa09d91 Author: Misha Dmitriev <mi...@cloudera.com> Authored: Thu Jul 26 22:15:12 2018 -0500 Committer: Imran Rashid <iras...@cloudera.com> Committed: Thu Jul 26 22:15:12 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/network/sasl/SaslEncryption.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/094aa597/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java index 3ac9081..d3b2a33 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java @@ -135,13 +135,14 @@ class SaslEncryption { private final boolean isByteBuf; private final ByteBuf buf; private final FileRegion region; + private final int maxOutboundBlockSize; /** * A channel used to buffer input data for encryption. The channel has an upper size bound * so that if the input is larger than the allowed buffer, it will be broken into multiple - * chunks. + * chunks. Made non-final to enable lazy initialization, which saves memory. */ - private final ByteArrayWritableChannel byteChannel; + private ByteArrayWritableChannel byteChannel; private ByteBuf currentHeader; private ByteBuffer currentChunk; @@ -157,7 +158,7 @@ class SaslEncryption { this.isByteBuf = msg instanceof ByteBuf; this.buf = isByteBuf ? (ByteBuf) msg : null; this.region = isByteBuf ? null : (FileRegion) msg; - this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize); + this.maxOutboundBlockSize = maxOutboundBlockSize; } /** @@ -292,6 +293,9 @@ class SaslEncryption { } private void nextChunk() throws IOException { + if (byteChannel == null) { + byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize); + } byteChannel.reset(); if (isByteBuf) { int copied = byteChannel.write(buf.nioBuffer()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org