This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9a1bc07e [#642]feat(server): better default options for shuffle server
(#662)
9a1bc07e is described below
commit 9a1bc07ee48aec7b92d6fd9175e90d65e0e8310e
Author: advancedxy <[email protected]>
AuthorDate: Mon Feb 27 21:42:46 2023 +0800
[#642]feat(server): better default options for shuffle server (#662)
### What changes were proposed in this pull request?
1. `rss.server.buffer.capacity` uses JVM heap size * ratio(0.6) by default
2. `rss.server.read.buffer.capacity` uses JVM heap size * ratio(0.2) by
default
3. `rss.server.disk.capacity` uses disk space * ratio(0.9) by default
### Why are the changes needed?
Fix: #642
### Does this PR introduce _any_ user-facing change?
Yes. Three new configurations are introduced, users can specify ratio values
for buffer, read buffer and disk capacity
### How was this patch tested?
New UTs.
---
docs/server_guide.md | 47 ++++++++++++----------
.../apache/uniffle/server/ShuffleServerConf.java | 25 +++++++++++-
.../server/buffer/ShuffleBufferManager.java | 13 ++++++
.../server/storage/LocalStorageManager.java | 2 +
.../server/buffer/ShuffleBufferManagerTest.java | 18 +++++++++
.../uniffle/storage/common/LocalStorage.java | 13 ++++--
.../uniffle/storage/common/LocalStorageTest.java | 13 ++++++
7 files changed, 104 insertions(+), 27 deletions(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 0f2cc017..fe20f342 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -64,28 +64,31 @@ This document will introduce how to deploy Uniffle shuffle
servers.
```
## Configuration
-|Property Name|Default|Description|
-|---|---|---|
-|rss.coordinator.quorum|-|Coordinator quorum|
-|rss.rpc.server.port|-|RPC port for Shuffle server|
-|rss.jetty.http.port|-|Http port for Shuffle server|
-|rss.server.buffer.capacity|-|Max memory of buffer manager for shuffle server|
-|rss.server.memory.shuffle.highWaterMark.percentage|75.0|Threshold of spill
data to storage, percentage of rss.server.buffer.capacity|
-|rss.server.memory.shuffle.lowWaterMark.percentage|25.0|Threshold of keep data
in memory, percentage of rss.server.buffer.capacity|
-|rss.server.read.buffer.capacity|-|Max size of buffer for reading data|
-|rss.server.heartbeat.interval|10000|Heartbeat interval to Coordinator (ms)|
-|rss.server.flush.threadPool.size|10|Thread pool for flush data to file|
-|rss.server.commit.timeout|600000|Timeout when commit shuffle data (ms)|
-|rss.storage.type|-|Supports MEMORY_LOCALFILE, MEMORY_HDFS,
MEMORY_LOCALFILE_HDFS|
-|rss.server.flush.cold.storage.threshold.size|64M| The threshold of data size
for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used|
-|rss.server.tags|-|The comma-separated list of tags to indicate the shuffle
server's attributes. It will be used as the assignment basis for the
coordinator|
-|rss.server.single.buffer.flush.enabled|false|Whether single buffer flush when
size exceeded rss.server.single.buffer.flush.threshold|
-|rss.server.single.buffer.flush.threshold|64M|The threshold of single shuffle
buffer flush|
-|rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If
it's negative, it will use the default disk whole space|
-|rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for
`MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`
will be used.|
-|rss.server.leak.shuffledata.check.interval|3600000|The interval of leak
shuffle data check (ms)|
-|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency
of single partition writer, the data partition file number is equal to this
value. Default value is 1. This config could improve the writing speed,
especially for huge partition.|
-|rss.metrics.reporter.class|-|The class of metrics reporter.|
+| Property Name | Default |
Description
|
+|-------------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| rss.coordinator.quorum | - |
Coordinator quorum
|
+| rss.rpc.server.port | - | RPC port
for Shuffle server
|
+| rss.jetty.http.port | - | Http port
for Shuffle server
|
+| rss.server.buffer.capacity | -1 | Max memory
of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio
is used
|
+| rss.server.buffer.capacity.ratio | 0.8 | when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size *
ratio
|
+| rss.server.memory.shuffle.highWaterMark.percentage | 75.0 | Threshold
of spill data to storage, percentage of rss.server.buffer.capacity
|
+| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0 | Threshold
of keep data in memory, percentage of rss.server.buffer.capacity
|
+| rss.server.read.buffer.capacity | -1 | Max size
of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is
used
|
+| rss.server.read.buffer.capacity.ratio | 0.4 | when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size * ratio
|
+| rss.server.heartbeat.interval | 10000 | Heartbeat
interval to Coordinator (ms)
|
+| rss.server.flush.threadPool.size | 10 | Thread
pool for flush data to file
|
+| rss.server.commit.timeout | 600000 | Timeout
when commit shuffle data (ms)
|
+| rss.storage.type | - | Supports
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
|
+| rss.server.flush.cold.storage.threshold.size | 64M | The
threshold of data size for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used
|
+| rss.server.tags | - | The
comma-separated list of tags to indicate the shuffle server's attributes. It
will be used as the assignment basis for the coordinator
|
+| rss.server.single.buffer.flush.enabled | false | Whether
single buffer flush when size exceeded rss.server.single.buffer.flush.threshold
|
+| rss.server.single.buffer.flush.threshold | 64M | The
threshold of single shuffle buffer flush
|
+| rss.server.disk.capacity | -1 | Disk
capacity that shuffle server can use. If negative, it will use disk whole space
* ratio
|
+| rss.server.disk.capacity.ratio | 0.9 | When
`rss.server.disk.capacity` is negative, disk whole space * ratio is used
|
+| rss.server.multistorage.fallback.strategy.class | - | The
fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`
will be used. |
+| rss.server.leak.shuffledata.check.interval | 3600000 | The
interval of leak shuffle data check (ms)
|
+| rss.server.max.concurrency.of.single.partition.writer | 1 | The max
concurrency of single partition writer, the data partition file number is equal
to this value. Default value is 1. This config could improve the writing speed,
especially for huge partition.
|
+| rss.metrics.reporter.class | - | The class
of metrics reporter.
|
### Advanced Configurations
|Property Name|Default| Description
|
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 6d501f18..5cf52049 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -35,15 +35,29 @@ public class ShuffleServerConf extends RssBaseConf {
public static final ConfigOption<Long> SERVER_BUFFER_CAPACITY = ConfigOptions
.key("rss.server.buffer.capacity")
.longType()
- .noDefaultValue()
+ .defaultValue(-1L)
.withDescription("Max memory of buffer manager for shuffle server");
+ public static final ConfigOption<Double> SERVER_BUFFER_CAPACITY_RATIO =
ConfigOptions
+ .key("rss.server.buffer.capacity.ratio")
+ .doubleType()
+ .defaultValue(0.6)
+ .withDescription("JVM heap size * ratio for the maximum memory of
buffer manager for shuffle server, this "
+ + "is only effective when `rss.server.buffer.capacity` is not
explicitly set");
+
public static final ConfigOption<Long> SERVER_READ_BUFFER_CAPACITY =
ConfigOptions
.key("rss.server.read.buffer.capacity")
.longType()
- .defaultValue(10000L)
+ .defaultValue(-1L)
.withDescription("Max size of buffer for reading data");
+ public static final ConfigOption<Double> SERVER_READ_BUFFER_CAPACITY_RATIO =
ConfigOptions
+ .key("rss.server.read.buffer.capacity.ratio")
+ .doubleType()
+ .defaultValue(0.2)
+ .withDescription("JVM heap size * ratio for read buffer size, this
is only effective when "
+ + "`rss.server.reader.buffer.capacity.ratio` is not explicitly
set");
+
public static final ConfigOption<Long> SERVER_HEARTBEAT_DELAY = ConfigOptions
.key("rss.server.heartbeat.delay")
.longType()
@@ -181,6 +195,13 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription("Disk capacity that shuffle server can use. "
+ "If it's negative, it will use the default whole space");
+ public static final ConfigOption<Double> DISK_CAPACITY_RATIO = ConfigOptions
+ .key("rss.server.disk.capacity.ratio")
+ .doubleType()
+ .defaultValue(0.9)
+ .withDescription("The maximum ratio of disk that could be used as
shuffle server. This is only effective "
+ + "when `rss.server.disk.capacity` is not explicitly set");
+
public static final ConfigOption<Long> SHUFFLE_EXPIRED_TIMEOUT_MS =
ConfigOptions
.key("rss.server.shuffle.expired.timeout.ms")
.longType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 4b4fc22a..497b3430 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -77,8 +77,16 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap =
Maps.newConcurrentMap();
public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager
shuffleFlushManager) {
+ long heapSize = Runtime.getRuntime().maxMemory();
this.capacity =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
+ if (this.capacity < 0) {
+ this.capacity = (long) (heapSize *
conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
+ }
this.readCapacity =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
+ if (this.readCapacity < 0) {
+ this.readCapacity = (long) (heapSize *
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
+ }
+ LOG.info("Init shuffle buffer manager with capacity: {}, read buffer
capacity: {}.", capacity, readCapacity);
this.shuffleFlushManager = shuffleFlushManager;
this.bufferPool = new ConcurrentHashMap<>();
this.retryNum =
conf.getInteger(ShuffleServerConf.SERVER_MEMORY_REQUEST_RETRY_MAX);
@@ -437,6 +445,11 @@ public class ShuffleBufferManager {
return capacity;
}
+ @VisibleForTesting
+ public long getReadCapacity() {
+ return readCapacity;
+ }
+
@VisibleForTesting
public void resetSize() {
usedMemory = new AtomicLong(0L);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 6ea3ef97..53f1a7ef 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -92,6 +92,7 @@ public class LocalStorageManager extends SingleStorageManager
{
this.partitionsOfStorage = Maps.newConcurrentMap();
long shuffleExpiredTimeoutMs =
conf.get(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS);
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
+ double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
double highWaterMarkOfWrite =
conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
double lowWaterMarkOfWrite =
conf.get(ShuffleServerConf.LOW_WATER_MARK_OF_WRITE);
if (highWaterMarkOfWrite < lowWaterMarkOfWrite) {
@@ -117,6 +118,7 @@ public class LocalStorageManager extends
SingleStorageManager {
localStorageArray[idx] = LocalStorage.newBuilder()
.basePath(storagePath)
.capacity(capacity)
+ .ratio(ratio)
.lowWaterMarkOfWrite(lowWaterMarkOfWrite)
.highWaterMarkOfWrite(highWaterMarkOfWrite)
.shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 670e684e..a5faffa6 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -629,4 +629,22 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// `shuffleBufferManager.getUsedMemory()` and
`shuffleBufferManager.getInFlushSize()`.
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() ->
shuffleBufferManager.getUsedMemory() == 0);
}
+
+ @Test
+ public void bufferManagerInitTest() {
+ ShuffleServerConf serverConf = new ShuffleServerConf();
+ shuffleBufferManager = new ShuffleBufferManager(serverConf,
mockShuffleFlushManager);
+ double ratio =
ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO.defaultValue();
+ double readRatio =
ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO.defaultValue();
+ assertEquals((long) (Runtime.getRuntime().maxMemory() * ratio),
shuffleBufferManager.getCapacity());
+ assertEquals((long) (Runtime.getRuntime().maxMemory() * readRatio),
shuffleBufferManager.getReadCapacity());
+ ratio = 0.6;
+ readRatio = 0.1;
+ serverConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO, ratio);
+ serverConf.set(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO,
readRatio);
+ shuffleBufferManager = new ShuffleBufferManager(serverConf,
mockShuffleFlushManager);
+ assertEquals((long) (Runtime.getRuntime().maxMemory() * ratio),
shuffleBufferManager.getCapacity());
+ assertEquals((long) (Runtime.getRuntime().maxMemory() * readRatio),
shuffleBufferManager.getReadCapacity());
+
+ }
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 1def5bfc..44c08082 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -87,9 +87,10 @@ public class LocalStorage extends AbstractStorage {
throw new RuntimeException(ioe);
}
if (capacity < 0L) {
- this.capacity = baseFolder.getTotalSpace();
- LOG.info("Make the disk capacity the total space when
\"rss.server.disk.capacity\" is not specified "
- + "or less than 0");
+ long totalSpace = baseFolder.getTotalSpace();
+ this.capacity = (long) (totalSpace * builder.ratio);
+ LOG.info("The `rss.server.disk.capacity` is not specified nor negative,
the "
+ + "ratio(`rss.server.disk.capacity.ratio`:{}) * disk space({}) is
used, ", builder.ratio, totalSpace);
} else {
long freeSpace = baseFolder.getFreeSpace();
if (freeSpace < capacity) {
@@ -336,6 +337,7 @@ public class LocalStorage extends AbstractStorage {
public static class Builder {
private long capacity;
+ private double ratio;
private double lowWaterMarkOfWrite;
private double highWaterMarkOfWrite;
private double cleanupThreshold;
@@ -352,6 +354,11 @@ public class LocalStorage extends AbstractStorage {
return this;
}
+ public Builder ratio(double ratio) {
+ this.ratio = ratio;
+ return this;
+ }
+
public Builder lowWaterMarkOfWrite(double lowWaterMarkOfWrite) {
this.lowWaterMarkOfWrite = lowWaterMarkOfWrite;
return this;
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 574ffc06..4aa7e2e3 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -94,6 +94,19 @@ public class LocalStorageTest {
assertTrue(item.canWrite());
}
+ @Test
+ public void getCapacityInitTest() {
+ LocalStorage item =
LocalStorage.newBuilder().basePath(testBaseDir.getAbsolutePath())
+ .cleanupThreshold(50)
+ .highWaterMarkOfWrite(95)
+ .lowWaterMarkOfWrite(80)
+ .capacity(-1)
+ .ratio(0.1)
+ .cleanIntervalMs(5000)
+ .build();
+ assertEquals((long) (testBaseDir.getTotalSpace() * 0.1),
item.getCapacity());
+ }
+
@Test
public void baseDirectoryInitTest() throws IOException {
// empty and writable base dir