This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c6cde5d3 [#596] feat(netty): Use off heap memory to read HDFS data
(#806)
c6cde5d3 is described below
commit c6cde5d39dcec6321ec8bce31a16527966aa863a
Author: roryqi <[email protected]>
AuthorDate: Thu Apr 13 16:36:19 2023 +0800
[#596] feat(netty): Use off heap memory to read HDFS data (#806)
### What changes were proposed in this pull request?
1. Use off heap memory to read HDFS data
2. remove some unused code
(to do: use off heap memory to read HDFS index data)
### Why are the changes needed?
Fix: #596
### Does this PR introduce _any_ user-facing change?
Yes, add the document.
### How was this patch tested?
Pass origin tests.
---
.../hadoop/mapreduce/task/reduce/RssShuffle.java | 2 +-
.../org/apache/spark/shuffle/RssSparkConfig.java | 5 ++
.../shuffle/reader/RssShuffleDataIterator.java | 30 +++++++++---
.../spark/shuffle/reader/RssShuffleReader.java | 3 +-
.../spark/shuffle/reader/RssShuffleReader.java | 3 +-
.../client/factory/ShuffleClientFactory.java | 3 +-
.../uniffle/client/impl/ShuffleReadClientImpl.java | 26 +++++++---
.../request/CreateShuffleReadClientRequest.java | 55 +++++++++++++---------
.../apache/uniffle/common/ShuffleDataResult.java | 24 ++++++++--
.../uniffle/common/config/RssClientConf.java | 6 +++
.../org/apache/uniffle/common/util/RssUtils.java | 9 ++++
.../uniffle/common/ShuffleDataResultTest.java | 3 +-
docs/client_guide.md | 1 +
.../RepartitionWithHdfsMultiStorageRssTest.java | 14 +++++-
.../org/apache/uniffle/storage/api/FileReader.java | 32 +++++++++++++
.../storage/factory/ShuffleHandlerFactory.java | 4 +-
.../handler/impl/HdfsClientReadHandler.java | 9 ++--
.../storage/handler/impl/HdfsFileReader.java | 39 +++++++++++++++
.../handler/impl/HdfsShuffleReadHandler.java | 36 ++++++++++----
.../storage/handler/impl/LocalFileReader.java | 11 +++++
.../request/CreateShuffleReadHandlerRequest.java | 9 ++++
.../uniffle/storage/util/ShuffleStorageUtils.java | 12 -----
22 files changed, 265 insertions(+), 71 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 372a265e..d08a5b2b 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -195,7 +195,7 @@ public class RssShuffle<K, V> implements
ShuffleConsumerPlugin<K, V>, ExceptionR
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath,
indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
serverInfoList,
- readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable);
+ readerJobConf, new MRIdHelper(), expectedTaskIdsBitmapFilterEnable,
false);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus,
merger, copyPhase, reporter, metrics,
shuffleReadClient, blockIdBitmap.getLongCardinality(),
RssMRConfig.toRssConf(rssJobConf));
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index e3c31555..20974e73 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -231,6 +231,11 @@ public class RssSparkConfig {
+ "whether this conf is set or not"))
.createWithDefault("");
+ public static final ConfigEntry<Boolean> RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE =
createBooleanBuilder(
+ new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +
RssClientConf.OFF_HEAP_MEMORY_ENABLE.key())
+ .doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description()))
+ .createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue());
+
public static final ConfigEntry<Integer>
RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index cb24be53..c905d27c 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.RssUtils;
public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K,
C>> {
@@ -74,9 +75,14 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
this.codec = compress ? Codec.newInstance(rssConf) : null;
}
- public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data,
int size) {
+ public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data) {
clearDeserializationStream();
- byteBufInputStream = new
ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size),
true);
+ // Unpooled.wrapperBuffer will return a ByteBuf, but this ByteBuf won't
release direct/heap memory
+ // when the ByteBuf is released. This is because the
UnpooledDirectByteBuf's doFree is false
+ // when it is constructed from user provided ByteBuffer.
+ // The `releaseOnClose` parameter doesn't take effect, we would release
the data ByteBuffer
+ // manually.
+ byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(data),
true);
deserializationStream =
serializerInstance.deserializeStream(byteBufInputStream);
return deserializationStream.asKeyValueIterator();
}
@@ -89,6 +95,7 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
}
}
+
if (deserializationStream != null) {
deserializationStream.close();
}
@@ -109,10 +116,10 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
long fetchDuration = System.currentTimeMillis() - startFetch;
shuffleReadMetrics.incFetchWaitTime(fetchDuration);
if (rawData != null) {
- int uncompressedLen = uncompress(rawBlock, rawData);
+ uncompress(rawBlock, rawData);
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
- recordsIterator = createKVIterator(uncompressedData, uncompressedLen);
+ recordsIterator = createKVIterator(uncompressedData);
long serializationDuration = System.currentTimeMillis() -
startSerialization;
readTime += fetchDuration;
serializeTime += serializationDuration;
@@ -139,8 +146,11 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (uncompressedData == null || uncompressedData.capacity() <
uncompressedLen) {
- // todo: support off-heap bytebuffer
- uncompressedData = ByteBuffer.allocate(uncompressedLen);
+ if (uncompressedData != null) {
+ RssUtils.releaseByteBuffer(uncompressedData);
+ }
+ uncompressedData = rawData.isDirect()
+ ? ByteBuffer.allocateDirect(uncompressedLen) :
ByteBuffer.allocate(uncompressedLen);
}
uncompressedData.clear();
long startDecompress = System.currentTimeMillis();
@@ -148,6 +158,9 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
unCompressedBytesLength += uncompressedLen;
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
+ // uncompressedData's limit is not updated by `codec.decompress`,
however this information is used
+ // by `createKVIterator`. Update limit here.
+ uncompressedData.limit(uncompressedData.position() + uncompressedLen);
} else {
uncompressedData = rawData;
}
@@ -162,6 +175,11 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
public BoxedUnit cleanup() {
clearDeserializationStream();
+ // Uncompressed data is released in this class, Compressed data is release
in the class ShuffleReadClientImpl
+ // So if codec is null, we don't release the data when the stream is closed
+ if (codec != null) {
+ RssUtils.releaseByteBuffer(uncompressedData);
+ }
if (shuffleReadClient != null) {
shuffleReadClient.close();
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 96252f44..ec976113 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -121,7 +121,8 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
appId, shuffleId, startPartition, storageType, basePath,
indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
- shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable);
+ shuffleServerInfoList, hadoopConf, expectedTaskIdsBitmapFilterEnable,
+ rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator rssShuffleDataIterator = new
RssShuffleDataIterator<K, C>(
shuffleDependency.serializer(), shuffleReadClient,
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 4bd44d1b..7cba5051 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -227,7 +227,8 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
CreateShuffleReadClientRequest request = new
CreateShuffleReadClientRequest(
appId, shuffleId, partition, storageType, basePath,
indexReadLimit, readBufferSize,
1, partitionNum, partitionToExpectBlocks.get(partition),
taskIdBitmap, shuffleServerInfoList,
- hadoopConf, dataDistributionType,
expectedTaskIdsBitmapFilterEnable);
+ hadoopConf, dataDistributionType,
expectedTaskIdsBitmapFilterEnable,
+ rssConf.getBoolean(RssClientConf.OFF_HEAP_MEMORY_ENABLE));
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
RssShuffleDataIterator<K, C> iterator = new RssShuffleDataIterator<>(
shuffleDependency.serializer(), shuffleReadClient,
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 39d9ded0..710d9a13 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -87,7 +87,8 @@ public class ShuffleClientFactory {
request.getHadoopConf(),
request.getIdHelper(),
request.getShuffleDataDistributionType(),
- request.isExpectedTaskIdsBitmapFilterEnable()
+ request.isExpectedTaskIdsBitmapFilterEnable(),
+ request.isOffHeapEnabled()
);
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 852a253f..4773e30e 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -50,7 +50,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
private final List<ShuffleServerInfo> shuffleServerInfoList;
private int shuffleId;
private int partitionId;
- private byte[] readBuffer;
+ private ByteBuffer readBuffer;
private Roaring64NavigableMap blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private Roaring64NavigableMap pendingBlockIds;
@@ -78,7 +78,8 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType,
- boolean expectedTaskIdsBitmapFilterEnable) {
+ boolean expectedTaskIdsBitmapFilterEnable,
+ boolean offHeapEnabled) {
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.blockIdBitmap = blockIdBitmap;
@@ -106,6 +107,9 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
if (expectedTaskIdsBitmapFilterEnable) {
request.useExpectedTaskIdsBitmapFilter();
}
+ if (offHeapEnabled) {
+ request.enableOffHeap();
+ }
List<Long> removeBlockIds = Lists.newArrayList();
blockIdBitmap.forEach(bid -> {
@@ -142,7 +146,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
this(storageType, appId, shuffleId, partitionId, indexReadLimit,
partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
- idHelper, ShuffleDataDistributionType.NORMAL, false);
+ idHelper, ShuffleDataDistributionType.NORMAL, false, false);
}
@Override
@@ -219,8 +223,10 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
}
if (bs != null) {
- return new CompressedShuffleBlock(ByteBuffer.wrap(readBuffer,
- bs.getOffset(), bs.getLength()), bs.getUncompressLength());
+ ByteBuffer compressedBuffer = readBuffer.duplicate();
+ compressedBuffer.position(bs.getOffset());
+ compressedBuffer.limit(bs.getOffset() + bs.getLength());
+ return new CompressedShuffleBlock(compressedBuffer,
bs.getUncompressLength());
}
// current segment hasn't data, try next segment
return readShuffleBlockData();
@@ -238,8 +244,11 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
if (sdr == null) {
return 0;
}
- readBuffer = sdr.getData();
- if (readBuffer == null || readBuffer.length == 0) {
+ if (readBuffer != null) {
+ RssUtils.releaseByteBuffer(readBuffer);
+ }
+ readBuffer = sdr.getDataBuffer();
+ if (readBuffer == null || readBuffer.capacity() == 0) {
return 0;
}
bufferSegmentQueue.addAll(sdr.getBufferSegments());
@@ -253,6 +262,9 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
@Override
public void close() {
+ if (readBuffer != null) {
+ RssUtils.releaseByteBuffer(readBuffer);
+ }
if (clientReadHandler != null) {
clientReadHandler.close();
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
index a4b4a325..a6f676fe 100644
---
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
@@ -45,6 +45,7 @@ public class CreateShuffleReadClientRequest {
private IdHelper idHelper;
private ShuffleDataDistributionType shuffleDataDistributionType =
ShuffleDataDistributionType.NORMAL;
private boolean expectedTaskIdsBitmapFilterEnable = false;
+ private boolean offHeapEnabled = false;
public CreateShuffleReadClientRequest(
String appId,
@@ -61,33 +62,14 @@ public class CreateShuffleReadClientRequest {
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
ShuffleDataDistributionType dataDistributionType,
- boolean expectedTaskIdsBitmapFilterEnable) {
+ boolean expectedTaskIdsBitmapFilterEnable,
+ boolean offHeapEnabled) {
this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit,
readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList,
- hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
+ hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable,
offHeapEnabled);
this.shuffleDataDistributionType = dataDistributionType;
}
- public CreateShuffleReadClientRequest(
- String appId,
- int shuffleId,
- int partitionId,
- String storageType,
- String basePath,
- int indexReadLimit,
- int readBufferSize,
- int partitionNumPerRange,
- int partitionNum,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- boolean expectedTaskIdsBitmapFilterEnable) {
- this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit,
readBufferSize,
- partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList,
- hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable);
- }
-
public CreateShuffleReadClientRequest(
String appId,
int shuffleId,
@@ -103,7 +85,8 @@ public class CreateShuffleReadClientRequest {
List<ShuffleServerInfo> shuffleServerInfoList,
Configuration hadoopConf,
IdHelper idHelper,
- boolean expectedTaskIdsBitmapFilterEnable) {
+ boolean expectedTaskIdsBitmapFilterEnable,
+ boolean offHeapEnabled) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
@@ -119,6 +102,28 @@ public class CreateShuffleReadClientRequest {
this.hadoopConf = hadoopConf;
this.idHelper = idHelper;
this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
+ this.offHeapEnabled = offHeapEnabled;
+ }
+
+ public CreateShuffleReadClientRequest(
+ String appId,
+ int shuffleId,
+ int partitionId,
+ String storageType,
+ String basePath,
+ int indexReadLimit,
+ int readBufferSize,
+ int partitionNumPerRange,
+ int partitionNum,
+ Roaring64NavigableMap blockIdBitmap,
+ Roaring64NavigableMap taskIdBitmap,
+ List<ShuffleServerInfo> shuffleServerInfoList,
+ Configuration hadoopConf,
+ boolean expectedTaskIdsBitmapFilterEnable,
+ boolean offHeapEnabled) {
+ this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit,
readBufferSize,
+ partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap,
shuffleServerInfoList,
+ hadoopConf, new DefaultIdHelper(), expectedTaskIdsBitmapFilterEnable,
offHeapEnabled);
}
public String getAppId() {
@@ -184,4 +189,8 @@ public class CreateShuffleReadClientRequest {
public boolean isExpectedTaskIdsBitmapFilterEnable() {
return expectedTaskIdsBitmapFilterEnable;
}
+
+ public boolean isOffHeapEnabled() {
+ return offHeapEnabled;
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
index 1e8109e4..19e93dd1 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
@@ -17,13 +17,14 @@
package org.apache.uniffle.common;
+import java.nio.ByteBuffer;
import java.util.List;
import com.google.common.collect.Lists;
public class ShuffleDataResult {
- private final byte[] data;
+ private final ByteBuffer data;
private final List<BufferSegment> bufferSegments;
public ShuffleDataResult() {
@@ -34,12 +35,29 @@ public class ShuffleDataResult {
this(data, Lists.newArrayList());
}
- public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
+ public ShuffleDataResult(ByteBuffer data, List<BufferSegment>
bufferSegments) {
this.data = data;
this.bufferSegments = bufferSegments;
}
+ public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
+ this(data != null ? ByteBuffer.wrap(data) : null, bufferSegments);
+ }
+
public byte[] getData() {
+ if (data == null) {
+ return null;
+ }
+ if (data.hasArray()) {
+ return data.array();
+ }
+ ByteBuffer dataBuffer = data.duplicate();
+ byte[] byteArray = new byte[dataBuffer.remaining()];
+ dataBuffer.get(byteArray);
+ return byteArray;
+ }
+
+ public ByteBuffer getDataBuffer() {
return data;
}
@@ -48,7 +66,7 @@ public class ShuffleDataResult {
}
public boolean isEmpty() {
- return bufferSegments == null || bufferSegments.isEmpty() || data == null
|| data.length == 0;
+ return bufferSegments == null || bufferSegments.isEmpty() || data == null
|| data.capacity() == 0;
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 33d20ddf..35d8172c 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -102,4 +102,10 @@ public class RssClientConf {
.intType()
.noDefaultValue()
.withDescription("internal configuration to indicate which port is
actually bind for shuffle manager service.");
+
+ public static final ConfigOption<Boolean> OFF_HEAP_MEMORY_ENABLE =
ConfigOptions
+ .key("rss.client.off.heap.memory.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Client can use off heap memory");
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 92688083..3935ab2f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -31,6 +31,7 @@ import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
@@ -47,6 +48,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
import io.netty.channel.unix.Errors;
+import io.netty.util.internal.PlatformDependent;
import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -367,4 +369,11 @@ public class RssUtils {
return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
}
}
+
+ public static void releaseByteBuffer(ByteBuffer byteBuffer) {
+ if (byteBuffer == null || !byteBuffer.isDirect()) {
+ return;
+ }
+ PlatformDependent.freeDirectBuffer(byteBuffer);
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
b/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
index 526a4e84..5affe92a 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleDataResultTest.java
@@ -30,9 +30,10 @@ public class ShuffleDataResultTest {
@Test
public void testEmpty() {
List<BufferSegment> segments = Collections.singletonList(new
BufferSegment(1, 2, 3, 4, 5, 6));
+ byte[] bytes = null;
assertTrue(new ShuffleDataResult().isEmpty());
assertTrue(new ShuffleDataResult(new byte[1]).isEmpty());
- assertTrue(new ShuffleDataResult(null, segments).isEmpty());
+ assertTrue(new ShuffleDataResult(bytes, segments).isEmpty());
assertTrue(new ShuffleDataResult(new byte[0], segments).isEmpty());
assertTrue(new ShuffleDataResult(new byte[1], null).isEmpty());
assertTrue(new ShuffleDataResult(new byte[1],
Collections.emptyList()).isEmpty());
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 413a5334..43bbceb6 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -191,6 +191,7 @@ The important configuration is listed as following.
|spark.rss.client.send.size.limit|16m|The max data size sent to shuffle server|
|spark.rss.client.unregister.thread.pool.size|10|The max size of thread pool
of unregistering|
|spark.rss.client.unregister.request.timeout.sec|10|The max timeout sec when
doing unregister to remote shuffle-servers|
+|spark.rss.client.off.heap.memory.enable|false|The client use off heap memory
to process data|
### MapReduce Specialized Setting
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
index 43b4a7c0..9bbc0e28 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
@@ -20,24 +20,36 @@ package org.apache.uniffle.test;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
+import java.util.Random;
import com.google.common.collect.Maps;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
public class RepartitionWithHdfsMultiStorageRssTest extends RepartitionTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RepartitionWithHdfsMultiStorageRssTest.class);
+
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE_HDFS.name());
+ Random random = new Random();
+ // todo: we should use parameterized test to modify here when we could
solve the issue that
+ // the test case use too long time.
+ boolean useOffHeap = random.nextInt() % 2 == 0;
+ LOG.info("use off heap: " + useOffHeap);
+ dynamicConf.put(RssSparkConfig.RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE.key(),
String.valueOf(useOffHeap));
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
addDynamicConf(coordinatorConf, dynamicConf);
createCoordinatorServer(coordinatorConf);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
b/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
index b3be22bb..18341e25 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/api/FileReader.java
@@ -17,9 +17,41 @@
package org.apache.uniffle.storage.api;
+import java.nio.ByteBuffer;
+
public interface FileReader {
+ /**
+ * This method will return a byte array, will read
+ * the length of this file data from offset position.
+ *
+ * @param offset the file offset which we start to read
+ * @param length the data length which we need to read
+ * @return file data
+ */
byte[] read(long offset, int length);
+ /**
+ * This method will return a byte array, will read
+ * the data from current position to the end of file
+ * @return file data
+ */
byte[] read();
+
+ /**
+ * This method will return a direct byte buffer, will read
+ * the length of this file data from offset position.
+ *
+ * @param offset the file offset which we start to read
+ * @param length the data length which we need to read
+ * @return file data
+ */
+ ByteBuffer readAsByteBuffer(long offset, int length);
+
+ /**
+ * This method will return a direct byte buffer, will read
+ * the data from current position to the end of file
+ * @return file data
+ */
+ ByteBuffer readAsByteBuffer();
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 08dd548c..efcc8fd6 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -163,8 +163,8 @@ public class ShuffleHandlerFactory {
request.getHadoopConf(),
request.getDistributionType(),
request.getExpectTaskIds(),
- ssi.getId()
- );
+ ssi.getId(),
+ request.isOffHeapEnabled());
}
public ShuffleDeleteHandler
createShuffleDeleteHandler(CreateShuffleDeleteHandlerRequest request) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index a8ac6425..003d03e4 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -54,6 +54,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
private int readHandlerIndex;
private ShuffleDataDistributionType distributionType;
private Roaring64NavigableMap expectTaskIds;
+ private boolean offHeapEnable = false;
public HdfsClientReadHandler(
String appId,
@@ -69,7 +70,8 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
Configuration hadoopConf,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
- String shuffleServerId) {
+ String shuffleServerId,
+ boolean offHeapEnable) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
@@ -84,6 +86,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
this.distributionType = distributionType;
this.expectTaskIds = expectTaskIds;
this.shuffleServerId = shuffleServerId;
+ this.offHeapEnable = offHeapEnable;
}
// Only for test
@@ -101,7 +104,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
Configuration hadoopConf) {
this(appId, shuffleId, partitionId, indexReadLimit, partitionNumPerRange,
partitionNum, readBufferSize,
expectBlockIds, processBlockIds, storageBasePath, hadoopConf,
ShuffleDataDistributionType.NORMAL,
- Roaring64NavigableMap.bitmapOf(), null);
+ Roaring64NavigableMap.bitmapOf(), null, false);
}
protected void init(String fullShufflePath) {
@@ -139,7 +142,7 @@ public class HdfsClientReadHandler extends
AbstractClientReadHandler {
HdfsShuffleReadHandler handler = new HdfsShuffleReadHandler(
appId, shuffleId, partitionId, filePrefix,
readBufferSize, expectBlockIds, processBlockIds, hadoopConf,
- distributionType, expectTaskIds);
+ distributionType, expectTaskIds, offHeapEnable);
readHandlers.add(handler);
} catch (Exception e) {
LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
index 38d6439e..1c9cb41a 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.handler.impl;
import java.io.Closeable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -79,6 +80,35 @@ public class HdfsFileReader implements FileReader, Closeable
{
}
}
+ @Override
+ public ByteBuffer readAsByteBuffer(long offset, int length) {
+ try {
+ fsDataInputStream.seek(offset);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+ readFully(buffer);
+ buffer.flip();
+ return buffer;
+ } catch (Exception e) {
+ LOG.warn("Can't read buffer data for path:" + path + " with offset[" +
offset + "], length[" + length + "]", e);
+ return ByteBuffer.allocateDirect(0);
+ }
+ }
+
+ @Override
+ public ByteBuffer readAsByteBuffer() {
+ try {
+ long length = getFileLen();
+ if (length - fsDataInputStream.getPos() > Integer.MAX_VALUE) {
+ LOG.warn("File " + path + "length is too long");
+ return ByteBuffer.allocateDirect(0);
+ }
+ return readAsByteBuffer(fsDataInputStream.getPos(), (int) length);
+ } catch (Exception e) {
+ LOG.warn("Can't read buffer data for path:" + path, e);
+ return ByteBuffer.allocateDirect(0);
+ }
+ }
+
public long getOffset() throws IOException {
return fsDataInputStream.getPos();
}
@@ -90,6 +120,15 @@ public class HdfsFileReader implements FileReader,
Closeable {
}
}
+ private void readFully(ByteBuffer buffer) throws IOException {
+ while (buffer.hasRemaining()) {
+ int result = fsDataInputStream.read(buffer);
+ if (result < 0) {
+ return;
+ }
+ }
+ }
+
public Path getPath() {
return path;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index 3e749679..0be589b6 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.storage.handler.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
protected final String filePrefix;
protected final HdfsFileReader indexReader;
protected final HdfsFileReader dataReader;
+ protected final boolean offHeapEnabled;
public HdfsShuffleReadHandler(
String appId,
@@ -54,12 +56,14 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
Roaring64NavigableMap processBlockIds,
Configuration conf,
ShuffleDataDistributionType distributionType,
- Roaring64NavigableMap expectTaskIds) throws Exception {
+ Roaring64NavigableMap expectTaskIds,
+ boolean offHeapEnabled) throws Exception {
super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds,
processBlockIds,
distributionType, expectTaskIds);
this.filePrefix = filePrefix;
this.indexReader =
createHdfsReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf);
this.dataReader =
createHdfsReader(ShuffleStorageUtils.generateDataFileName(filePrefix), conf);
+ this.offHeapEnabled = offHeapEnabled;
}
// Only for test
@@ -73,7 +77,7 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
Roaring64NavigableMap processBlockIds,
Configuration conf) throws Exception {
this(appId, shuffleId, partitionId, filePrefix, readBufferSize,
expectBlockIds,
- processBlockIds, conf, ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf());
+ processBlockIds, conf, ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf(), false);
}
@Override
@@ -106,17 +110,23 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
return null;
}
- byte[] data = readShuffleData(shuffleDataSegment.getOffset(),
expectedLength);
- if (data.length == 0) {
+ ByteBuffer data;
+ if (offHeapEnabled) {
+ data = readShuffleDataByteBuffer(shuffleDataSegment.getOffset(),
expectedLength);
+ } else {
+ data = ByteBuffer.wrap(readShuffleData(shuffleDataSegment.getOffset(),
expectedLength));
+ }
+ int length = data.limit() - data.position();
+ if (length == 0) {
LOG.warn("Fail to read expected[{}] data, actual[{}] and segment is {}
from file {}.data",
- expectedLength, data.length, shuffleDataSegment, filePrefix);
+ expectedLength, length, shuffleDataSegment, filePrefix);
return null;
}
ShuffleDataResult shuffleDataResult = new ShuffleDataResult(data,
shuffleDataSegment.getBufferSegments());
if (shuffleDataResult.isEmpty()) {
LOG.warn("Shuffle data is empty, expected length {}, data length {},
segment {} in file {}.data",
- expectedLength, data.length, shuffleDataSegment, filePrefix);
+ expectedLength, length, shuffleDataSegment, filePrefix);
return null;
}
@@ -133,6 +143,17 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
return data;
}
+ private ByteBuffer readShuffleDataByteBuffer(long offset, int
expectedLength) {
+ ByteBuffer data = dataReader.readAsByteBuffer(offset, expectedLength);
+ int length = data.limit() - data.position();
+ if (length != expectedLength) {
+ LOG.warn("Fail to read byte buffer expected[{}] data, actual[{}] from
file {}.data",
+ expectedLength, length, filePrefix);
+ return ByteBuffer.allocateDirect(0);
+ }
+ return data;
+ }
+
private long getDataFileLen() {
try {
return dataReader.getFileLen();
@@ -168,7 +189,4 @@ public class HdfsShuffleReadHandler extends
DataSkippableReadHandler {
return shuffleDataSegments;
}
- public String getFilePrefix() {
- return filePrefix;
- }
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
index 99eb0cd0..cceab2e7 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileReader.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
@@ -76,6 +77,16 @@ public class LocalFileReader implements FileReader,
Closeable {
}
}
+ @Override
+ public ByteBuffer readAsByteBuffer(long offset, int length) {
+ throw new UnsupportedOperationException("Local file reader don't support
off heap read now");
+ }
+
+ @Override
+ public ByteBuffer readAsByteBuffer() {
+ throw new UnsupportedOperationException("Local file reader don't support
off heap read now");
+ }
+
@Override
public synchronized void close() {
if (dataInputStream != null) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 0801893a..a3aaab72 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -46,6 +46,7 @@ public class CreateShuffleReadHandlerRequest {
private ShuffleDataDistributionType distributionType;
private Roaring64NavigableMap expectTaskIds;
private boolean expectedTaskIdsBitmapFilterEnable;
+ private boolean offHeapEnabled;
private IdHelper idHelper;
@@ -195,4 +196,12 @@ public class CreateShuffleReadHandlerRequest {
public void setIdHelper(IdHelper idHelper) {
this.idHelper = idHelper;
}
+
+ public void enableOffHeap() {
+ this.offHeapEnabled = true;
+ }
+
+ public boolean isOffHeapEnabled() {
+ return offHeapEnabled;
+ }
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index e9abec15..d268775c 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -120,14 +120,6 @@ public class ShuffleStorageUtils {
String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(start),
String.valueOf(end)));
}
- public static String getUploadShuffleDataPath(String appId, int shuffleId,
int partitionId) {
- return String.join(
- HDFS_PATH_SEPARATOR,
- appId,
- String.valueOf(shuffleId),
- String.valueOf(partitionId));
- }
-
public static String getCombineDataPath(String appId, int shuffleId) {
return String.join(
HDFS_PATH_SEPARATOR,
@@ -193,10 +185,6 @@ public class ShuffleStorageUtils {
}
}
- // index file header is $PartitionNum | [($PartitionId |
$PartitionFileLength | $PartitionDataFileLength), ] | $CRC
- public static long getIndexFileHeaderLen(int partitionNum) {
- return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
- }
public static long uploadFile(File file, HdfsFileWriter writer, int
bufferSize) throws IOException {
try (FileInputStream inputStream = new FileInputStream(file)) {