This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 44fa8fb9 [#133] feat(netty): local shuffle read support zero-copy. 
(#1047)
44fa8fb9 is described below

commit 44fa8fb92150f61ed36fec44b4cf6d017409111d
Author: Xianming Lei <[email protected]>
AuthorDate: Fri Jul 28 09:44:31 2023 +0800

    [#133] feat(netty): local shuffle read support zero-copy. (#1047)
    
    ### What changes were proposed in this pull request?
    
    Local shuffle read support zero-copy.
    
    ### Why are the changes needed?
    
    For #133.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
    
    Co-authored-by: leixianming <[email protected]>
---
 .../apache/uniffle/common/ShuffleIndexResult.java  | 36 ++++++++++++--
 .../netty/buffer/FileSegmentManagedBuffer.java     | 11 ++--
 .../protocol/GetLocalShuffleIndexResponse.java     | 47 +++++++++++++-----
 .../uniffle/common/ShuffleIndexResultTest.java     |  2 +-
 .../handler/impl/LocalFileServerReadHandler.java   | 58 ++++++----------------
 5 files changed, 89 insertions(+), 65 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index adf28e2a..56b9befb 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -19,21 +19,47 @@ package org.apache.uniffle.common;
 
 import java.nio.ByteBuffer;
 
+import io.netty.buffer.Unpooled;
+
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
+import org.apache.uniffle.common.util.ByteBufUtils;
+
 public class ShuffleIndexResult {
-  private final ByteBuffer indexData;
+  private final ManagedBuffer buffer;
   private long dataFileLen;
 
   public ShuffleIndexResult() {
     this(ByteBuffer.wrap(new byte[0]), -1);
   }
 
-  public ShuffleIndexResult(ByteBuffer bytes, long dataFileLen) {
-    this.indexData = bytes;
+  public ShuffleIndexResult(byte[] data, long dataFileLen) {
+    this(data != null ? ByteBuffer.wrap(data) : null, dataFileLen);
+  }
+
+  public ShuffleIndexResult(ByteBuffer data, long dataFileLen) {
+    this.buffer =
+        new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : 
Unpooled.EMPTY_BUFFER);
     this.dataFileLen = dataFileLen;
   }
 
+  public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen) {
+    this.buffer = buffer;
+    this.dataFileLen = dataFileLen;
+  }
+
+  public byte[] getData() {
+    if (buffer == null) {
+      return null;
+    }
+    if (buffer.nioByteBuffer().hasArray()) {
+      return buffer.nioByteBuffer().array();
+    }
+    return ByteBufUtils.readBytes(buffer.byteBuf());
+  }
+
   public ByteBuffer getIndexData() {
-    return indexData;
+    return buffer.nioByteBuffer();
   }
 
   public long getDataFileLen() {
@@ -41,6 +67,6 @@ public class ShuffleIndexResult {
   }
 
   public boolean isEmpty() {
-    return indexData == null || indexData.remaining() == 0;
+    return buffer == null || buffer.size() == 0;
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
index af101fd3..a4975bde 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
@@ -27,17 +27,20 @@ import java.nio.file.StandardOpenOption;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.DefaultFileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.JavaUtils;
 
 public class FileSegmentManagedBuffer extends ManagedBuffer {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSegmentManagedBuffer.class);
   private final File file;
-  private final int offset;
+  private final long offset;
   private final int length;
 
-  public FileSegmentManagedBuffer(File file, int offset, int length) {
+  public FileSegmentManagedBuffer(File file, long offset, int length) {
     this.file = file;
     this.offset = offset;
     this.length = length;
@@ -80,7 +83,9 @@ public class FileSegmentManagedBuffer extends ManagedBuffer {
       } catch (IOException ignored) {
         // ignore
       }
-      throw new RssException(errorMessage, e);
+
+      LOG.error(errorMessage, e);
+      return ByteBuffer.allocate(0);
     } finally {
       JavaUtils.closeQuietly(channel);
     }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
index 4bb63d30..d6179086 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
@@ -19,45 +19,62 @@ package org.apache.uniffle.common.netty.protocol;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
 
+import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
-public class GetLocalShuffleIndexResponse extends RpcResponse {
-  private ByteBuf indexData;
+public class GetLocalShuffleIndexResponse extends RpcResponse implements 
Transferable {
+  private ManagedBuffer buffer;
   private long fileLength;
 
   public GetLocalShuffleIndexResponse(
-      long requestId, StatusCode statusCode, byte[] indexData, long 
fileLength) {
-    this(requestId, statusCode, null, indexData, fileLength);
+      long requestId, StatusCode statusCode, String retMessage, byte[] 
indexData, long fileLength) {
+    this(
+        requestId,
+        statusCode,
+        retMessage,
+        indexData == null ? Unpooled.EMPTY_BUFFER : 
Unpooled.wrappedBuffer(indexData),
+        fileLength);
   }
 
   public GetLocalShuffleIndexResponse(
-      long requestId, StatusCode statusCode, String retMessage, byte[] 
indexData, long fileLength) {
-    this(requestId, statusCode, retMessage, Unpooled.wrappedBuffer(indexData), 
fileLength);
+      long requestId,
+      StatusCode statusCode,
+      String retMessage,
+      ByteBuf indexData,
+      long fileLength) {
+    this(requestId, statusCode, retMessage, new NettyManagedBuffer(indexData), 
fileLength);
   }
 
   public GetLocalShuffleIndexResponse(
       long requestId,
       StatusCode statusCode,
       String retMessage,
-      ByteBuf indexData,
+      ManagedBuffer indexData,
       long fileLength) {
     super(requestId, statusCode, retMessage);
-    this.indexData = indexData;
+    this.buffer = indexData;
     this.fileLength = fileLength;
   }
 
   @Override
   public int encodedLength() {
-    return super.encodedLength() + Integer.BYTES + indexData.readableBytes() + 
Long.BYTES;
+    return super.encodedLength() + Integer.BYTES + buffer.size() + Long.BYTES;
   }
 
   @Override
   public void encode(ByteBuf buf) {
     super.encode(buf);
-    ByteBufUtils.copyByteBuf(indexData, buf);
-    indexData.release();
+    if (buffer instanceof FileSegmentManagedBuffer) {
+      buf.writeInt(buffer.size());
+    } else {
+      ByteBufUtils.copyByteBuf(buffer.byteBuf(), buf);
+      buffer.release();
+    }
     buf.writeLong(fileLength);
   }
 
@@ -77,10 +94,16 @@ public class GetLocalShuffleIndexResponse extends 
RpcResponse {
   }
 
   public ByteBuf getIndexData() {
-    return indexData;
+    return buffer.byteBuf();
   }
 
   public long getFileLength() {
     return fileLength;
   }
+
+  @Override
+  public void transferTo(Channel channel) {
+    channel.write(buffer.convertToNetty());
+    buffer.release();
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java 
b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
index 740fbe08..3637029f 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
@@ -26,6 +26,6 @@ public class ShuffleIndexResultTest {
   @Test
   public void testEmpty() {
     assertTrue(new ShuffleIndexResult().isEmpty());
-    assertTrue(new ShuffleIndexResult(null, -1).isEmpty());
+    assertTrue(new ShuffleIndexResult((byte[]) null, -1).isEmpty());
   }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index e4a9b7b4..b211fb4e 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.storage.handler.impl;
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +27,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
@@ -128,55 +128,25 @@ public class LocalFileServerReadHandler implements 
ServerReadHandler {
     return fileName.substring(0, point);
   }
 
-  private LocalFileReader createFileReader(String path) throws Exception {
-    return new LocalFileReader(path);
-  }
-
   @Override
   public ShuffleDataResult getShuffleData(long offset, int length) {
-    byte[] readBuffer = new byte[0];
-
-    try {
-      long start = System.currentTimeMillis();
-      try (LocalFileReader reader = createFileReader(dataFileName)) {
-        readBuffer = reader.read(offset, length);
-      }
-      LOG.debug(
-          "Read File segment: {}, offset[{}], length[{}], cost: {} ms, for 
appId[{}], shuffleId[{}], partitionId[{}]",
-          dataFileName,
-          offset,
-          length,
-          System.currentTimeMillis() - start,
-          appId,
-          shuffleId,
-          partitionId);
-    } catch (Exception e) {
-      LOG.warn("Can't read data for{}, offset[{}], length[{}]", dataFileName, 
offset, length);
-    }
-
-    return new ShuffleDataResult(readBuffer);
+    return new ShuffleDataResult(
+        new FileSegmentManagedBuffer(new File(dataFileName), offset, length));
   }
 
   @Override
   public ShuffleIndexResult getShuffleIndex() {
-    int indexNum = 0;
-    int len = 0;
-    try (LocalFileReader reader = createFileReader(indexFileName)) {
-      long indexFileSize = new File(indexFileName).length();
-      indexNum = (int) (indexFileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
-      len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
-      if (indexFileSize != len) {
-        LOG.warn(
-            "Maybe the index file: {} is being written due to the 
shuffle-buffer flushing.",
-            indexFileName);
-      }
-      byte[] indexData = reader.read(0, len);
-      // get dataFileSize for read segment generation in 
DataSkippableReadHandler#readShuffleData
-      long dataFileSize = new File(dataFileName).length();
-      return new ShuffleIndexResult(ByteBuffer.wrap(indexData), dataFileSize);
-    } catch (Exception e) {
-      LOG.error("Fail to read index file {} indexNum {} len {}", 
indexFileName, indexNum, len);
-      return new ShuffleIndexResult();
+    File indexFile = new File(indexFileName);
+    long indexFileSize = indexFile.length();
+    int indexNum = (int) (indexFileSize / 
FileBasedShuffleSegment.SEGMENT_SIZE);
+    int len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
+    if (indexFileSize != len) {
+      LOG.warn(
+          "Maybe the index file: {} is being written due to the shuffle-buffer 
flushing.",
+          indexFileName);
     }
+    // get dataFileSize for read segment generation in 
DataSkippableReadHandler#readShuffleData
+    long dataFileSize = new File(dataFileName).length();
+    return new ShuffleIndexResult(new FileSegmentManagedBuffer(indexFile, 0, 
len), dataFileSize);
   }
 }

Reply via email to