This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c6cde5d3 [#596] feat(netty): Use off heap memory to read HDFS data 
(#806)
c6cde5d3 is described below

commit c6cde5d39dcec6321ec8bce31a16527966aa863a
Author: roryqi <[email protected]>
AuthorDate: Thu Apr 13 16:36:19 2023 +0800

    [#596] feat(netty): Use off heap memory to read HDFS data (#806)
    
    ### What changes were proposed in this pull request?
    1. Use off heap memory to read HDFS data
    2. remove some unused code
    (to do: use off heap memory to read HDFS index data)
    
    ### Why are the changes needed?
    Fix: #596
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, add the document.
    
    ### How was this patch tested?
    Pass origin tests.
---
 .../hadoop/mapreduce/task/reduce/RssShuffle.java   |  2 +-
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  5 ++
 .../shuffle/reader/RssShuffleDataIterator.java     | 30 +++++++++---
 .../spark/shuffle/reader/RssShuffleReader.java     |  3 +-
 .../spark/shuffle/reader/RssShuffleReader.java     |  3 +-
 .../client/factory/ShuffleClientFactory.java       |  3 +-
 .../uniffle/client/impl/ShuffleReadClientImpl.java | 26 +++++++---
 .../request/CreateShuffleReadClientRequest.java    | 55 +++++++++++++---------
 .../apache/uniffle/common/ShuffleDataResult.java   | 24 ++++++++--
 .../uniffle/common/config/RssClientConf.java       |  6 +++
 .../org/apache/uniffle/common/util/RssUtils.java   |  9 ++++
 .../uniffle/common/ShuffleDataResultTest.java      |  3 +-
 docs/client_guide.md                               |  1 +
 .../RepartitionWithHdfsMultiStorageRssTest.java    | 14 +++++-
 .../org/apache/uniffle/storage/api/FileReader.java | 32 +++++++++++++
 .../storage/factory/ShuffleHandlerFactory.java     |  4 +-
 .../handler/impl/HdfsClientReadHandler.java        |  9 ++--
 .../storage/handler/impl/HdfsFileReader.java       | 39 +++++++++++++++
 .../handler/impl/HdfsShuffleReadHandler.java       | 36 ++++++++++----
 .../storage/handler/impl/LocalFileReader.java      | 11 +++++
 .../request/CreateShuffleReadHandlerRequest.java   |  9 ++++
 .../uniffle/storage/util/ShuffleStorageUtils.java  | 12 -----
 22 files changed, 265 insertions(+), 71 deletions(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 372a265e..d08a5b2b 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -195,7 +195,7 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
       CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
           appId, 0, reduceId.getTaskID().getId(), storageType, basePath, 
indexReadLimit, readBufferSize,
           partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
serverInfoList,
-          readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable);
+          readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable, 
false);
       ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
       RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, 
merger, copyPhase, reporter, metrics,
           shuffleReadClient, blockIdBitmap.getLongCardinality(), 
RssMRConfig.toRssConf(rssJobConf));
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index e3c31555..20974e73 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -231,6 +231,11 @@ public class RssSparkConfig {
               + "whether this conf is set or not"))
       .createWithDefault("");
 
+  public static final ConfigEntry<Boolean> RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE = 
createBooleanBuilder(
+      new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + 
RssClientConf.OFF_HEAP_MEMORY_ENABLE.key())
+        .doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description()))
+      .createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue());
+
   public static final ConfigEntry<Integer> 
RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder(
       new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER))
       
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index cb24be53..c905d27c 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.RssUtils;
 
 public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, 
C>> {
 
@@ -74,9 +75,14 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
     this.codec = compress ? Codec.newInstance(rssConf) : null;
   }
 
-  public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data, 
int size) {
+  public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
     clearDeserializationStream();
-    byteBufInputStream = new 
ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), 
true);
+    // Unpooled.wrapperBuffer will return a ByteBuf, but this ByteBuf won't 
release direct/heap memory
+    // when the ByteBuf is released. This is because the 
UnpooledDirectByteBuf's doFree is false 
+    // when it is constructed from user provided ByteBuffer. 
+    // The `releaseOnClose` parameter doesn't take effect, we would release 
the data ByteBuffer 
+    // manually.
+    byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data), 
true);
     deserializationStream = 
serializerInstance.deserializeStream(byteBufInputStream);
     return deserializationStream.asKeyValueIterator();
   }
@@ -89,6 +95,7 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
         LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
       }
     }
+
     if (deserializationStream != null) {
       deserializationStream.close();
     }
@@ -109,10 +116,10 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
       long fetchDuration = System.currentTimeMillis() - startFetch;
       shuffleReadMetrics.incFetchWaitTime(fetchDuration);
       if (rawData != null) {
-        int uncompressedLen = uncompress(rawBlock, rawData);
+        uncompress(rawBlock, rawData);
         // create new iterator for shuffle data
         long startSerialization = System.currentTimeMillis();
-        recordsIterator = createKVIterator(uncompressedData, uncompressedLen);
+        recordsIterator = createKVIterator(uncompressedData);
         long serializationDuration = System.currentTimeMillis() - 
startSerialization;
         readTime += fetchDuration;
         serializeTime += serializationDuration;
@@ -139,8 +146,11 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
     int uncompressedLen = rawBlock.getUncompressLength();
     if (codec != null) {
       if (uncompressedData == null || uncompressedData.capacity() < 
uncompressedLen) {
-        // todo: support off-heap bytebuffer
-        uncompressedData = ByteBuffer.allocate(uncompressedLen);
+        if (uncompressedData != null) {
+          RssUtils.releaseByteBuffer(uncompressedData);
+        }
+        uncompressedData = rawData.isDirect()
+            ? ByteBuffer.allocateDirect(uncompressedLen) : 
ByteBuffer.allocate(uncompressedLen);
       }
       uncompressedData.clear();
       long startDecompress = System.currentTimeMillis();
@@ -148,6 +158,9 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
       unCompressedBytesLength += uncompressedLen;
       long decompressDuration = System.currentTimeMillis() - startDecompress;
       decompressTime += decompressDuration;
+      // uncompressedData's limit is not updated by `codec.decompress`, 
however this information is used
+      // by `createKVIterator`. Update limit here.
+      uncompressedData.limit(uncompressedData.position() + uncompressedLen);
     } else {
       uncompressedData = rawData;
     }
@@ -162,6 +175,11 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
 
   public BoxedUnit cleanup() {
     clearDeserializationStream();
+    // Uncompressed data is released in this class, Compressed data is release 
in the class ShuffleReadClientImpl
+    // So if codec is null, we don't release the data when the stream is closed
+    if (codec != null) {
+      RssUtils.releaseByteBuffer(uncompressedData);
+    }
     if (shuffleReadClient != null) {
       shuffleReadClient.close();
     }
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 96252f44..ec976113 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -121,7 +121,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
     CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
         appId, shuffleId, startPartition, storageType, basePath, 
indexReadLimit, readBufferSize,
         partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
-        shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable);
+        shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable,
+        rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
     ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
     RssShuffleDataIterator rssShuffleDataIterator = new 
RssShuffleDataIterator<K, C>(
         shuffleDependency.serializer(), shuffleReadClient,
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 4bd44d1b..7cba5051 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -227,7 +227,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
         CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
             appId, shuffleId, partition, storageType, basePath, 
indexReadLimit, readBufferSize,
             1, partitionNum, partitionToExpectBlocks.get(partition), 
taskIdBitmap, shuffleServerInfoList,
-            hadoopConf, dataDistributionType, 
expectedTaskIdsBitmapFilterEnable);
+            hadoopConf, dataDistributionType, 
expectedTaskIdsBitmapFilterEnable,
+            rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
         ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
         RssShuffleDataIterator<K, C> iterator = new RssShuffleDataIterator<>(
             shuffleDependency.serializer(), shuffleReadClient,
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 39d9ded0..710d9a13 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -87,7 +87,8 @@ public class ShuffleClientFactory {
         request.getHadoopConf(),
         request.getIdHelper(),
         request.getShuffleDataDistributionType(),
-        request.isExpectedTaskIdsBitmapFilterEnable()
+        request.isExpectedTaskIdsBitmapFilterEnable(),
+        request.isOffHeapEnabled()
     );
   }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 852a253f..4773e30e 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
   private final List<ShuffleServerInfo> shuffleServerInfoList;
   private int shuffleId;
   private int partitionId;
-  private byte[] readBuffer;
+  private ByteBuffer readBuffer;
   private Roaring64NavigableMap blockIdBitmap;
   private Roaring64NavigableMap taskIdBitmap;
   private Roaring64NavigableMap pendingBlockIds;
@@ -78,7 +78,8 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
       Configuration hadoopConf,
       IdHelper idHelper,
       ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable) {
+      boolean expectedTaskIdsBitmapFilterEnable,
+      boolean offHeapEnabled) {
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.blockIdBitmap = blockIdBitmap;
@@ -106,6 +107,9 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     if (expectedTaskIdsBitmapFilterEnable) {
       request.useExpectedTaskIdsBitmapFilter();
     }
+    if (offHeapEnabled) {
+      request.enableOffHeap();
+    }
 
     List<Long> removeBlockIds = Lists.newArrayList();
     blockIdBitmap.forEach(bid -> {
@@ -142,7 +146,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     this(storageType, appId, shuffleId, partitionId, indexReadLimit,
         partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
         blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
-        idHelper, ShuffleDataDistributionType.NORMAL, false);
+        idHelper, ShuffleDataDistributionType.NORMAL, false, false);
   }
 
   @Override
@@ -219,8 +223,10 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     }
 
     if (bs != null) {
-      return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
-          bs.getOffset(), bs.getLength()), bs.getUncompressLength());
+      ByteBuffer compressedBuffer = readBuffer.duplicate();
+      compressedBuffer.position(bs.getOffset());
+      compressedBuffer.limit(bs.getOffset() + bs.getLength());
+      return new CompressedShuffleBlock(compressedBuffer, 
bs.getUncompressLength());
     }
     // current segment hasn't data, try next segment
     return readShuffleBlockData();
@@ -238,8 +244,11 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     if (sdr == null) {
       return 0;
     }
-    readBuffer = sdr.getData();
-    if (readBuffer == null || readBuffer.length == 0) {
+    if (readBuffer != null) {
+      RssUtils.releaseByteBuffer(readBuffer);
+    }
+    readBuffer = sdr.getDataBuffer();
+    if (readBuffer == null || readBuffer.capacity() == 0) {
       return 0;
     }
     bufferSegmentQueue.addAll(sdr.getBufferSegments());
@@ -253,6 +262,9 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
 
   @Override
   public void close() {
+    if (readBuffer != null) {
+      RssUtils.releaseByteBuffer(readBuffer);
+    }
     if (clientReadHandler != null) {
       clientReadHandler.close();
     }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
 
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
index a4b4a325..a6f676fe 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
@@ -45,6 +45,7 @@ public class CreateShuffleReadClientRequest {
   private IdHelper idHelper;
   private ShuffleDataDistributionType shuffleDataDistributionType = 
ShuffleDataDistributionType.NORMAL;
   private boolean expectedTaskIdsBitmapFilterEnable = false;
+  private boolean offHeapEnabled = false;
 
   public CreateShuffleReadClientRequest(
       String appId,
@@ -61,33 +62,14 @@ public class CreateShuffleReadClientRequest {
       List<ShuffleServerInfo> shuffleServerInfoList,
       Configuration hadoopConf,
       ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable) {
+      boolean expectedTaskIdsBitmapFilterEnable,
+      boolean offHeapEnabled) {
     this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, 
readBufferSize,
         partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList,
-        hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
+        hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, 
offHeapEnabled);
     this.shuffleDataDistributionType = dataDistributionType;
   }
 
-  public CreateShuffleReadClientRequest(
-      String appId,
-      int shuffleId,
-      int partitionId,
-      String storageType,
-      String basePath,
-      int indexReadLimit,
-      int readBufferSize,
-      int partitionNumPerRange,
-      int partitionNum,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap taskIdBitmap,
-      List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf,
-      boolean expectedTaskIdsBitmapFilterEnable) {
-    this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, 
readBufferSize,
-        partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList,
-        hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
-  }
-
   public CreateShuffleReadClientRequest(
       String appId,
       int shuffleId,
@@ -103,7 +85,8 @@ public class CreateShuffleReadClientRequest {
       List<ShuffleServerInfo> shuffleServerInfoList,
       Configuration hadoopConf,
       IdHelper idHelper,
-      boolean expectedTaskIdsBitmapFilterEnable) {
+      boolean expectedTaskIdsBitmapFilterEnable,
+      boolean offHeapEnabled) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -119,6 +102,28 @@ public class CreateShuffleReadClientRequest {
     this.hadoopConf = hadoopConf;
     this.idHelper = idHelper;
     this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
+    this.offHeapEnabled = offHeapEnabled;
+  }
+
+  public CreateShuffleReadClientRequest(
+      String appId,
+      int shuffleId,
+      int partitionId,
+      String storageType,
+      String basePath,
+      int indexReadLimit,
+      int readBufferSize,
+      int partitionNumPerRange,
+      int partitionNum,
+      Roaring64NavigableMap blockIdBitmap,
+      Roaring64NavigableMap taskIdBitmap,
+      List<ShuffleServerInfo> shuffleServerInfoList,
+      Configuration hadoopConf,
+      boolean expectedTaskIdsBitmapFilterEnable,
+      boolean offHeapEnabled) {
+    this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, 
readBufferSize,
+        partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList,
+        hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable, 
offHeapEnabled);
   }
 
   public String getAppId() {
@@ -184,4 +189,8 @@ public class CreateShuffleReadClientRequest {
   public boolean isExpectedTaskIdsBitmapFilterEnable() {
     return expectedTaskIdsBitmapFilterEnable;
   }
+
+  public boolean isOffHeapEnabled() {
+    return offHeapEnabled;
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
index 1e8109e4..19e93dd1 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
@@ -17,13 +17,14 @@
 
 package org.apache.uniffle.common;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import com.google.common.collect.Lists;
 
 public class ShuffleDataResult {
 
-  private final byte[] data;
+  private final ByteBuffer data;
   private final List<BufferSegment> bufferSegments;
 
   public ShuffleDataResult() {
@@ -34,12 +35,29 @@ public class ShuffleDataResult {
     this(data, Lists.newArrayList());
   }
 
-  public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
+  public ShuffleDataResult(ByteBuffer data, List<BufferSegment> 
bufferSegments) {
     this.data = data;
     this.bufferSegments = bufferSegments;
   }
 
+  public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
+    this(data != null ? ByteBuffer.wrap(data) : null, bufferSegments);
+  }
+
   public byte[] getData() {
+    if (data == null) {
+      return null;
+    }
+    if (data.hasArray()) {
+      return data.array();
+    }
+    ByteBuffer dataBuffer = data.duplicate();
+    byte[] byteArray = new byte[dataBuffer.remaining()];
+    dataBuffer.get(byteArray);
+    return byteArray;
+  }
+
+  public ByteBuffer getDataBuffer() {
     return data;
   }
 
@@ -48,7 +66,7 @@ public class ShuffleDataResult {
   }
 
   public boolean isEmpty() {
-    return bufferSegments == null || bufferSegments.isEmpty() || data == null 
|| data.length == 0;
+    return bufferSegments == null || bufferSegments.isEmpty() || data == null 
|| data.capacity() == 0;
   }
 
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 33d20ddf..35d8172c 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -102,4 +102,10 @@ public class RssClientConf {
       .intType()
       .noDefaultValue()
       .withDescription("internal configuration to indicate which port is 
actually bind for shuffle manager service.");
+
+  public static final ConfigOption<Boolean> OFF_HEAP_MEMORY_ENABLE = 
ConfigOptions
+      .key("rss.client.off.heap.memory.enable")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Client can use off heap memory");
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 92688083..3935ab2f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -31,6 +31,7 @@ import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InterfaceAddress;
 import java.net.NetworkInterface;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Enumeration;
@@ -47,6 +48,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.net.InetAddresses;
 import io.netty.channel.unix.Errors;
+import io.netty.util.internal.PlatformDependent;
 import org.eclipse.jetty.util.MultiException;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -367,4 +369,11 @@ public class RssUtils {
       return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
     }
   }
+
+  public static void releaseByteBuffer(ByteBuffer byteBuffer) {
+    if (byteBuffer == null || !byteBuffer.isDirect()) {
+      return;
+    }
+    PlatformDependent.freeDirectBuffer(byteBuffer);
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java 
b/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
index 526a4e84..5affe92a 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
@@ -30,9 +30,10 @@ public class ShuffleDataResultTest {
   @Test
   public void testEmpty() {
     List<BufferSegment> segments = Collections.singletonList(new 
BufferSegment(1, 2, 3, 4, 5, 6));
+    byte[] bytes = null;
     assertTrue(new ShuffleDataResult().isEmpty());
     assertTrue(new ShuffleDataResult(new byte[1]).isEmpty());
-    assertTrue(new ShuffleDataResult(null, segments).isEmpty());
+    assertTrue(new ShuffleDataResult(bytes, segments).isEmpty());
     assertTrue(new ShuffleDataResult(new byte[0], segments).isEmpty());
     assertTrue(new ShuffleDataResult(new byte[1], null).isEmpty());
     assertTrue(new ShuffleDataResult(new byte[1], 
Collections.emptyList()).isEmpty());
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 413a5334..43bbceb6 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -191,6 +191,7 @@ The important configuration is listed as following.
 |spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
 |spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool 
of unregistering|
 |spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when 
doing unregister to remote shuffle-servers|
+|spark.rss.client.off.heap.memory.enable|false|The client use off heap memory 
to process data|
 
 
 ### MapReduce Specialized Setting
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
index 43b4a7c0..9bbc0e28 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
@@ -20,24 +20,36 @@ package org.apache.uniffle.test;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Random;
 
 import com.google.common.collect.Maps;
 import org.apache.spark.SparkConf;
 import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
 
 public class RepartitionWithHdfsMultiStorageRssTest extends RepartitionTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RepartitionWithHdfsMultiStorageRssTest.class);
+
   @BeforeAll
   public static void setupServers(@TempDir File tmpDir) throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
     Map<String, String> dynamicConf = Maps.newHashMap();
     dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), 
HDFS_URI + "rss/test");
     dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE_HDFS.name());
+    Random random = new Random();
+    // todo: we should use parameterized test to modify here when we could 
solve the issue that
+    //  the test case use too long time.
+    boolean useOffHeap = random.nextInt() % 2 == 0;
+    LOG.info("use off heap: " + useOffHeap);
+    dynamicConf.put(RssSparkConfig.RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE.key(), 
String.valueOf(useOffHeap));
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
     addDynamicConf(coordinatorConf, dynamicConf);
     createCoordinatorServer(coordinatorConf);
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java 
b/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
index b3be22bb..18341e25 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
@@ -17,9 +17,41 @@
 
 package org.apache.uniffle.storage.api;
 
+import java.nio.ByteBuffer;
+
 public interface FileReader {
 
+  /**
+   * This method will return a byte array, will read
+   * the length of this file data from offset position.
+   *
+   * @param offset the file offset which we start to read
+   * @param length the data length which we need to read
+   * @return file data
+   */
   byte[] read(long offset, int length);
 
+  /**
+   *  This method will return a byte array, will read
+   *  the data from current position to the end of file
+   * @return file data
+   */
   byte[] read();
+
+  /**
+   * This method will return a direct byte buffer, will read
+   * the length of this file data from offset position.
+   *
+   * @param offset the file offset which we start to read
+   * @param length the data length which we need to read
+   * @return file data
+   */
+  ByteBuffer readAsByteBuffer(long offset, int length);
+
+  /**
+   *  This method will return a direct byte buffer, will read
+   *  the data from current position to the end of file
+   * @return file data
+   */
+  ByteBuffer readAsByteBuffer();
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 08dd548c..efcc8fd6 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -163,8 +163,8 @@ public class ShuffleHandlerFactory {
         request.getHadoopConf(),
         request.getDistributionType(),
         request.getExpectTaskIds(),
-        ssi.getId()
-    );
+        ssi.getId(),
+        request.isOffHeapEnabled());
   }
 
   public ShuffleDeleteHandler 
createShuffleDeleteHandler(CreateShuffleDeleteHandlerRequest request) {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index a8ac6425..003d03e4 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -54,6 +54,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
   private int readHandlerIndex;
   private ShuffleDataDistributionType distributionType;
   private Roaring64NavigableMap expectTaskIds;
+  private boolean offHeapEnable = false;
 
   public HdfsClientReadHandler(
       String appId,
@@ -69,7 +70,8 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
       Configuration hadoopConf,
       ShuffleDataDistributionType distributionType,
       Roaring64NavigableMap expectTaskIds,
-      String shuffleServerId) {
+      String shuffleServerId,
+      boolean offHeapEnable) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -84,6 +86,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
     this.distributionType = distributionType;
     this.expectTaskIds = expectTaskIds;
     this.shuffleServerId = shuffleServerId;
+    this.offHeapEnable = offHeapEnable;
   }
 
   // Only for test
@@ -101,7 +104,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
       Configuration hadoopConf) {
     this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange, 
partitionNum, readBufferSize,
         expectBlockIds, processBlockIds, storageBasePath, hadoopConf, 
ShuffleDataDistributionType.NORMAL,
-        Roaring64NavigableMap.bitmapOf(), null);
+        Roaring64NavigableMap.bitmapOf(), null, false);
   }
 
   protected void init(String fullShufflePath) {
@@ -139,7 +142,7 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
           HdfsShuffleReadHandler handler = new HdfsShuffleReadHandler(
               appId, shuffleId, partitionId, filePrefix,
               readBufferSize, expectBlockIds, processBlockIds, hadoopConf,
-              distributionType, expectTaskIds);
+              distributionType, expectTaskIds, offHeapEnable);
           readHandlers.add(handler);
         } catch (Exception e) {
           LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
index 38d6439e..1c9cb41a 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -79,6 +80,35 @@ public class HdfsFileReader implements FileReader, Closeable 
{
     }
   }
 
+  @Override
+  public ByteBuffer readAsByteBuffer(long offset, int length) {
+    try {
+      fsDataInputStream.seek(offset);
+      ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+      readFully(buffer);
+      buffer.flip();
+      return buffer;
+    } catch (Exception e) {
+      LOG.warn("Can't read buffer data for path:" + path + " with offset[" + 
offset + "], length[" + length + "]", e);
+      return ByteBuffer.allocateDirect(0);
+    }
+  }
+
+  @Override
+  public ByteBuffer readAsByteBuffer() {
+    try {
+      long length = getFileLen();
+      if (length - fsDataInputStream.getPos() > Integer.MAX_VALUE) {
+        LOG.warn("File " + path + "length is too long");
+        return ByteBuffer.allocateDirect(0);
+      }
+      return readAsByteBuffer(fsDataInputStream.getPos(), (int) length);
+    } catch (Exception e) {
+      LOG.warn("Can't read buffer data for path:" + path, e);
+      return ByteBuffer.allocateDirect(0);
+    }
+  }
+
   public long getOffset() throws IOException {
     return fsDataInputStream.getPos();
   }
@@ -90,6 +120,15 @@ public class HdfsFileReader implements FileReader, 
Closeable {
     }
   }
 
+  private void readFully(ByteBuffer buffer) throws IOException {
+    while (buffer.hasRemaining()) {
+      int result = fsDataInputStream.read(buffer);
+      if (result < 0) {
+        return;
+      }
+    }
+  }
+
   public Path getPath() {
     return path;
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 3e749679..0be589b6 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.storage.handler.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
   protected final String filePrefix;
   protected final HdfsFileReader indexReader;
   protected final HdfsFileReader dataReader;
+  protected final boolean offHeapEnabled;
 
   public HdfsShuffleReadHandler(
       String appId,
@@ -54,12 +56,14 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
       Roaring64NavigableMap processBlockIds,
       Configuration conf,
       ShuffleDataDistributionType distributionType,
-      Roaring64NavigableMap expectTaskIds) throws Exception {
+      Roaring64NavigableMap expectTaskIds,
+      boolean offHeapEnabled) throws Exception {
     super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, 
processBlockIds,
         distributionType, expectTaskIds);
     this.filePrefix = filePrefix;
     this.indexReader = 
createHdfsReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf);
     this.dataReader = 
createHdfsReader(ShuffleStorageUtils.generateDataFileName(filePrefix), conf);
+    this.offHeapEnabled = offHeapEnabled;
   }
 
   // Only for test
@@ -73,7 +77,7 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
       Roaring64NavigableMap processBlockIds,
       Configuration conf) throws Exception {
     this(appId, shuffleId, partitionId, filePrefix, readBufferSize, 
expectBlockIds,
-        processBlockIds, conf, ShuffleDataDistributionType.NORMAL, 
Roaring64NavigableMap.bitmapOf());
+        processBlockIds, conf, ShuffleDataDistributionType.NORMAL, 
Roaring64NavigableMap.bitmapOf(), false);
   }
 
   @Override
@@ -106,17 +110,23 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
       return null;
     }
 
-    byte[] data = readShuffleData(shuffleDataSegment.getOffset(), 
expectedLength);
-    if (data.length == 0) {
+    ByteBuffer data;
+    if (offHeapEnabled) {
+      data = readShuffleDataByteBuffer(shuffleDataSegment.getOffset(), 
expectedLength);
+    } else {
+      data = ByteBuffer.wrap(readShuffleData(shuffleDataSegment.getOffset(), 
expectedLength));
+    }
+    int length = data.limit() - data.position();
+    if (length == 0) {
       LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {} 
from file {}.data",
-          expectedLength, data.length, shuffleDataSegment, filePrefix);
+          expectedLength, length, shuffleDataSegment, filePrefix);
       return null;
     }
 
     ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data, 
shuffleDataSegment.getBufferSegments());
     if (shuffleDataResult.isEmpty()) {
       LOG.warn("Shuffle data is empty, expected length {}, data length {}, 
segment {} in file {}.data",
-          expectedLength, data.length, shuffleDataSegment, filePrefix);
+          expectedLength, length, shuffleDataSegment, filePrefix);
       return null;
     }
 
@@ -133,6 +143,17 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
     return data;
   }
 
+  private ByteBuffer readShuffleDataByteBuffer(long offset, int 
expectedLength) {
+    ByteBuffer data = dataReader.readAsByteBuffer(offset, expectedLength);
+    int length = data.limit() - data.position();
+    if (length != expectedLength) {
+      LOG.warn("Fail to read byte buffer expected[{}] data, actual[{}] from 
file {}.data",
+          expectedLength, length, filePrefix);
+      return ByteBuffer.allocateDirect(0);
+    }
+    return data;
+  }
+
   private long getDataFileLen() {
     try {
       return dataReader.getFileLen();
@@ -168,7 +189,4 @@ public class HdfsShuffleReadHandler extends 
DataSkippableReadHandler {
     return shuffleDataSegments;
   }
 
-  public String getFilePrefix() {
-    return filePrefix;
-  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
index 99eb0cd0..cceab2e7 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
@@ -76,6 +77,16 @@ public class LocalFileReader implements FileReader, 
Closeable {
     }
   }
 
+  @Override
+  public ByteBuffer readAsByteBuffer(long offset, int length) {
+    throw new UnsupportedOperationException("Local file reader don't support 
off heap read now");
+  }
+
+  @Override
+  public ByteBuffer readAsByteBuffer() {
+    throw new UnsupportedOperationException("Local file reader don't support 
off heap read now");
+  }
+
   @Override
   public synchronized void close() {
     if (dataInputStream != null) {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 0801893a..a3aaab72 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -46,6 +46,7 @@ public class CreateShuffleReadHandlerRequest {
   private ShuffleDataDistributionType distributionType;
   private Roaring64NavigableMap expectTaskIds;
   private boolean expectedTaskIdsBitmapFilterEnable;
+  private boolean offHeapEnabled;
 
   private IdHelper idHelper;
 
@@ -195,4 +196,12 @@ public class CreateShuffleReadHandlerRequest {
   public void setIdHelper(IdHelper idHelper) {
     this.idHelper = idHelper;
   }
+
+  public void enableOffHeap() {
+    this.offHeapEnabled = true;
+  }
+
+  public boolean isOffHeapEnabled() {
+    return offHeapEnabled;
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
 
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index e9abec15..d268775c 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -120,14 +120,6 @@ public class ShuffleStorageUtils {
         String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(start), 
String.valueOf(end)));
   }
 
-  public static String getUploadShuffleDataPath(String appId, int shuffleId, 
int partitionId) {
-    return String.join(
-        HDFS_PATH_SEPARATOR,
-        appId,
-        String.valueOf(shuffleId),
-        String.valueOf(partitionId));
-  }
-
   public static String getCombineDataPath(String appId, int shuffleId) {
     return String.join(
         HDFS_PATH_SEPARATOR,
@@ -193,10 +185,6 @@ public class ShuffleStorageUtils {
     }
   }
 
-  // index file header is $PartitionNum | [($PartitionId | 
$PartitionFileLength | $PartitionDataFileLength), ] | $CRC
-  public static long getIndexFileHeaderLen(int partitionNum) {
-    return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
-  }
 
   public static long uploadFile(File file, HdfsFileWriter writer, int 
bufferSize) throws IOException {
     try (FileInputStream inputStream = new FileInputStream(file)) {


Reply via email to