Repository: spark
Updated Branches:
  refs/heads/master 3c614d056 -> 92fd7f321


[SPARK-25115][CORE] Eliminate extra memory copy done when a ByteBuf is used 
that is backed by > 1 ByteBuffer.

…d by > 1 ByteBuffer.

## What changes were proposed in this pull request?

Check how many ByteBuffer are used and depending on it do either call 
nioBuffer(...) or nioBuffers(...) to eliminate extra memory copies.

This is related to netty/netty#8176.

## How was this patch tested?

Unit tests added.

Closes #22105 from normanmaurer/composite_byte_buf_mem_copy.

Authored-by: Norman Maurer <norman_mau...@apple.com>
Signed-off-by: DB Tsai <d_t...@apple.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92fd7f32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92fd7f32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92fd7f32

Branch: refs/heads/master
Commit: 92fd7f321c4c1c58e07e74ddaaa4932c7c27bcf4
Parents: 3c614d0
Author: Norman Maurer <norman_mau...@apple.com>
Authored: Wed Aug 15 00:02:46 2018 +0000
Committer: DB Tsai <d_t...@apple.com>
Committed: Wed Aug 15 00:02:46 2018 +0000

----------------------------------------------------------------------
 .../network/protocol/MessageWithHeader.java     | 20 ++++++++++--
 .../protocol/MessageWithHeaderSuite.java        | 32 +++++++++++++++++++-
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92fd7f32/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index e7b66a6..b81c25a 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -140,8 +140,24 @@ class MessageWithHeader extends AbstractFileRegion {
     // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
     // for the case that the passed-in buffer has too many components.
     int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
-    ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
-    int written = target.write(buffer);
+    // If the ByteBuf holds more then one ByteBuffer we should better call 
nioBuffers(...)
+    // to eliminate extra memory copies.
+    int written = 0;
+    if (buf.nioBufferCount() == 1) {
+      ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
+      written = target.write(buffer);
+    } else {
+      ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
+      for (ByteBuffer buffer: buffers) {
+        int remaining = buffer.remaining();
+        int w = target.write(buffer);
+        written += w;
+        if (w < remaining) {
+          // Could not write all, we need to break now.
+          break;
+        }
+      }
+    }
     buf.skipBytes(written);
     return written;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/92fd7f32/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
index ecb66fc..3bff34e 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/protocol/MessageWithHeaderSuite.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.spark.network.util.AbstractFileRegion;
 import org.junit.Test;
@@ -48,7 +49,36 @@ public class MessageWithHeaderSuite {
 
   @Test
   public void testByteBufBody() throws Exception {
+    testByteBufBody(Unpooled.copyLong(42));
+  }
+
+  @Test
+  public void testCompositeByteBufBodySingleBuffer() throws Exception {
+    ByteBuf header = Unpooled.copyLong(42);
+    CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
+    compositeByteBuf.addComponent(true, header);
+    assertEquals(1, compositeByteBuf.nioBufferCount());
+    testByteBufBody(compositeByteBuf);
+  }
+
+  @Test
+  public void testCompositeByteBufBodyMultipleBuffers() throws Exception {
     ByteBuf header = Unpooled.copyLong(42);
+    CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
+    compositeByteBuf.addComponent(true, header.retainedSlice(0, 4));
+    compositeByteBuf.addComponent(true, header.slice(4, 4));
+    assertEquals(2, compositeByteBuf.nioBufferCount());
+    testByteBufBody(compositeByteBuf);
+  }
+
+  /**
+   * Test writing a {@link MessageWithHeader} using the given {@link ByteBuf} 
as header.
+   *
+   * @param header the header to use.
+   * @throws Exception thrown on error.
+   */
+  private void testByteBufBody(ByteBuf header) throws Exception {
+    long expectedHeaderValue = header.getLong(header.readerIndex());
     ByteBuf bodyPassedToNettyManagedBuffer = Unpooled.copyLong(84);
     assertEquals(1, header.refCnt());
     assertEquals(1, bodyPassedToNettyManagedBuffer.refCnt());
@@ -61,7 +91,7 @@ public class MessageWithHeaderSuite {
     MessageWithHeader msg = new MessageWithHeader(managedBuf, header, body, 
managedBuf.size());
     ByteBuf result = doWrite(msg, 1);
     assertEquals(msg.count(), result.readableBytes());
-    assertEquals(42, result.readLong());
+    assertEquals(expectedHeaderValue, result.readLong());
     assertEquals(84, result.readLong());
 
     assertTrue(msg.release());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to