This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 44fa8fb9 [#133] feat(netty): local shuffle read support zero-copy.
(#1047)
44fa8fb9 is described below
commit 44fa8fb92150f61ed36fec44b4cf6d017409111d
Author: Xianming Lei <[email protected]>
AuthorDate: Fri Jul 28 09:44:31 2023 +0800
[#133] feat(netty): local shuffle read support zero-copy. (#1047)
### What changes were proposed in this pull request?
Local shuffle read support zero-copy.
### Why are the changes needed?
For #133.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Co-authored-by: leixianming <[email protected]>
---
.../apache/uniffle/common/ShuffleIndexResult.java | 36 ++++++++++++--
.../netty/buffer/FileSegmentManagedBuffer.java | 11 ++--
.../protocol/GetLocalShuffleIndexResponse.java | 47 +++++++++++++-----
.../uniffle/common/ShuffleIndexResultTest.java | 2 +-
.../handler/impl/LocalFileServerReadHandler.java | 58 ++++++----------------
5 files changed, 89 insertions(+), 65 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
index adf28e2a..56b9befb 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleIndexResult.java
@@ -19,21 +19,47 @@ package org.apache.uniffle.common;
import java.nio.ByteBuffer;
+import io.netty.buffer.Unpooled;
+
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
+import org.apache.uniffle.common.util.ByteBufUtils;
+
public class ShuffleIndexResult {
- private final ByteBuffer indexData;
+ private final ManagedBuffer buffer;
private long dataFileLen;
public ShuffleIndexResult() {
this(ByteBuffer.wrap(new byte[0]), -1);
}
- public ShuffleIndexResult(ByteBuffer bytes, long dataFileLen) {
- this.indexData = bytes;
+ public ShuffleIndexResult(byte[] data, long dataFileLen) {
+ this(data != null ? ByteBuffer.wrap(data) : null, dataFileLen);
+ }
+
+ public ShuffleIndexResult(ByteBuffer data, long dataFileLen) {
+ this.buffer =
+ new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) :
Unpooled.EMPTY_BUFFER);
this.dataFileLen = dataFileLen;
}
+ public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen) {
+ this.buffer = buffer;
+ this.dataFileLen = dataFileLen;
+ }
+
+ public byte[] getData() {
+ if (buffer == null) {
+ return null;
+ }
+ if (buffer.nioByteBuffer().hasArray()) {
+ return buffer.nioByteBuffer().array();
+ }
+ return ByteBufUtils.readBytes(buffer.byteBuf());
+ }
+
public ByteBuffer getIndexData() {
- return indexData;
+ return buffer.nioByteBuffer();
}
public long getDataFileLen() {
@@ -41,6 +67,6 @@ public class ShuffleIndexResult {
}
public boolean isEmpty() {
- return indexData == null || indexData.remaining() == 0;
+ return buffer == null || buffer.size() == 0;
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
index af101fd3..a4975bde 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
@@ -27,17 +27,20 @@ import java.nio.file.StandardOpenOption;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultFileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
public class FileSegmentManagedBuffer extends ManagedBuffer {
+ private static final Logger LOG =
LoggerFactory.getLogger(FileSegmentManagedBuffer.class);
private final File file;
- private final int offset;
+ private final long offset;
private final int length;
- public FileSegmentManagedBuffer(File file, int offset, int length) {
+ public FileSegmentManagedBuffer(File file, long offset, int length) {
this.file = file;
this.offset = offset;
this.length = length;
@@ -80,7 +83,9 @@ public class FileSegmentManagedBuffer extends ManagedBuffer {
} catch (IOException ignored) {
// ignore
}
- throw new RssException(errorMessage, e);
+
+ LOG.error(errorMessage, e);
+ return ByteBuffer.allocate(0);
} finally {
JavaUtils.closeQuietly(channel);
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
index 4bb63d30..d6179086 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexResponse.java
@@ -19,45 +19,62 @@ package org.apache.uniffle.common.netty.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
-public class GetLocalShuffleIndexResponse extends RpcResponse {
- private ByteBuf indexData;
+public class GetLocalShuffleIndexResponse extends RpcResponse implements
Transferable {
+ private ManagedBuffer buffer;
private long fileLength;
public GetLocalShuffleIndexResponse(
- long requestId, StatusCode statusCode, byte[] indexData, long
fileLength) {
- this(requestId, statusCode, null, indexData, fileLength);
+ long requestId, StatusCode statusCode, String retMessage, byte[]
indexData, long fileLength) {
+ this(
+ requestId,
+ statusCode,
+ retMessage,
+ indexData == null ? Unpooled.EMPTY_BUFFER :
Unpooled.wrappedBuffer(indexData),
+ fileLength);
}
public GetLocalShuffleIndexResponse(
- long requestId, StatusCode statusCode, String retMessage, byte[]
indexData, long fileLength) {
- this(requestId, statusCode, retMessage, Unpooled.wrappedBuffer(indexData),
fileLength);
+ long requestId,
+ StatusCode statusCode,
+ String retMessage,
+ ByteBuf indexData,
+ long fileLength) {
+ this(requestId, statusCode, retMessage, new NettyManagedBuffer(indexData),
fileLength);
}
public GetLocalShuffleIndexResponse(
long requestId,
StatusCode statusCode,
String retMessage,
- ByteBuf indexData,
+ ManagedBuffer indexData,
long fileLength) {
super(requestId, statusCode, retMessage);
- this.indexData = indexData;
+ this.buffer = indexData;
this.fileLength = fileLength;
}
@Override
public int encodedLength() {
- return super.encodedLength() + Integer.BYTES + indexData.readableBytes() +
Long.BYTES;
+ return super.encodedLength() + Integer.BYTES + buffer.size() + Long.BYTES;
}
@Override
public void encode(ByteBuf buf) {
super.encode(buf);
- ByteBufUtils.copyByteBuf(indexData, buf);
- indexData.release();
+ if (buffer instanceof FileSegmentManagedBuffer) {
+ buf.writeInt(buffer.size());
+ } else {
+ ByteBufUtils.copyByteBuf(buffer.byteBuf(), buf);
+ buffer.release();
+ }
buf.writeLong(fileLength);
}
@@ -77,10 +94,16 @@ public class GetLocalShuffleIndexResponse extends
RpcResponse {
}
public ByteBuf getIndexData() {
- return indexData;
+ return buffer.byteBuf();
}
public long getFileLength() {
return fileLength;
}
+
+ @Override
+ public void transferTo(Channel channel) {
+ channel.write(buffer.convertToNetty());
+ buffer.release();
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
index 740fbe08..3637029f 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleIndexResultTest.java
@@ -26,6 +26,6 @@ public class ShuffleIndexResultTest {
@Test
public void testEmpty() {
assertTrue(new ShuffleIndexResult().isEmpty());
- assertTrue(new ShuffleIndexResult(null, -1).isEmpty());
+ assertTrue(new ShuffleIndexResult((byte[]) null, -1).isEmpty());
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
index e4a9b7b4..b211fb4e 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandler.java
@@ -19,7 +19,6 @@ package org.apache.uniffle.storage.handler.impl;
import java.io.File;
import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +27,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
@@ -128,55 +128,25 @@ public class LocalFileServerReadHandler implements
ServerReadHandler {
return fileName.substring(0, point);
}
- private LocalFileReader createFileReader(String path) throws Exception {
- return new LocalFileReader(path);
- }
-
@Override
public ShuffleDataResult getShuffleData(long offset, int length) {
- byte[] readBuffer = new byte[0];
-
- try {
- long start = System.currentTimeMillis();
- try (LocalFileReader reader = createFileReader(dataFileName)) {
- readBuffer = reader.read(offset, length);
- }
- LOG.debug(
- "Read File segment: {}, offset[{}], length[{}], cost: {} ms, for
appId[{}], shuffleId[{}], partitionId[{}]",
- dataFileName,
- offset,
- length,
- System.currentTimeMillis() - start,
- appId,
- shuffleId,
- partitionId);
- } catch (Exception e) {
- LOG.warn("Can't read data for{}, offset[{}], length[{}]", dataFileName,
offset, length);
- }
-
- return new ShuffleDataResult(readBuffer);
+ return new ShuffleDataResult(
+ new FileSegmentManagedBuffer(new File(dataFileName), offset, length));
}
@Override
public ShuffleIndexResult getShuffleIndex() {
- int indexNum = 0;
- int len = 0;
- try (LocalFileReader reader = createFileReader(indexFileName)) {
- long indexFileSize = new File(indexFileName).length();
- indexNum = (int) (indexFileSize / FileBasedShuffleSegment.SEGMENT_SIZE);
- len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
- if (indexFileSize != len) {
- LOG.warn(
- "Maybe the index file: {} is being written due to the
shuffle-buffer flushing.",
- indexFileName);
- }
- byte[] indexData = reader.read(0, len);
- // get dataFileSize for read segment generation in
DataSkippableReadHandler#readShuffleData
- long dataFileSize = new File(dataFileName).length();
- return new ShuffleIndexResult(ByteBuffer.wrap(indexData), dataFileSize);
- } catch (Exception e) {
- LOG.error("Fail to read index file {} indexNum {} len {}",
indexFileName, indexNum, len);
- return new ShuffleIndexResult();
+ File indexFile = new File(indexFileName);
+ long indexFileSize = indexFile.length();
+ int indexNum = (int) (indexFileSize /
FileBasedShuffleSegment.SEGMENT_SIZE);
+ int len = indexNum * FileBasedShuffleSegment.SEGMENT_SIZE;
+ if (indexFileSize != len) {
+ LOG.warn(
+ "Maybe the index file: {} is being written due to the shuffle-buffer
flushing.",
+ indexFileName);
}
+ // get dataFileSize for read segment generation in
DataSkippableReadHandler#readShuffleData
+ long dataFileSize = new File(dataFileName).length();
+ return new ShuffleIndexResult(new FileSegmentManagedBuffer(indexFile, 0,
len), dataFileSize);
}
}