This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0125fe38 [#596][FOLLOWUP] Index data support offheap read (#852)
0125fe38 is described below
commit 0125fe380d16fdfc626b4f3eb1ef4fd2b4af936c
Author: roryqi <[email protected]>
AuthorDate: Wed May 10 09:41:45 2023 +0800
[#596][FOLLOWUP] Index data support offheap read (#852)
### What changes were proposed in this pull request?
We support to read off heap data in the #806, we support to read index in
this pr.
### Why are the changes needed?
#596 follow up pr
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA passed.
---
.../storage/handler/impl/HdfsShuffleReadHandler.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 8e84f963..56151198 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -84,18 +84,22 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
protected ShuffleIndexResult readShuffleIndex() {
long start = System.currentTimeMillis();
try {
- byte[] indexData = indexReader.read();
- int segmentNumber = indexData.length /
FileBasedShuffleSegment.SEGMENT_SIZE;
+ ByteBuffer indexData = null;
+ if (offHeapEnabled) {
+ indexData = indexReader.readAsByteBuffer();
+ } else {
+ indexData = ByteBuffer.wrap(indexReader.read());
+ }
+ int indexDataLength = indexData.limit() - indexData.position();
+ int segmentNumber = indexDataLength /
FileBasedShuffleSegment.SEGMENT_SIZE;
int expectedLen = segmentNumber * FileBasedShuffleSegment.SEGMENT_SIZE;
- if (indexData.length != expectedLen) {
+ if (indexDataLength != expectedLen) {
LOG.warn("Maybe the index file: {} is being written due to the
shuffle-buffer flushing.", filePrefix);
- byte[] indexNewData = new byte[expectedLen];
- System.arraycopy(indexData, 0, indexNewData, 0, expectedLen);
- indexData = indexNewData;
+ indexData.limit(expectedLen);
}
long dateFileLen = getDataFileLen();
LOG.info("Read index files {}.index for {} ms", filePrefix,
System.currentTimeMillis() - start);
- return new ShuffleIndexResult(ByteBuffer.wrap(indexData), dateFileLen);
+ return new ShuffleIndexResult(indexData, dateFileLen);
} catch (Exception e) {
LOG.info("Fail to read index files {}.index", filePrefix, e);
}