This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 648931c46 [#1727] improvement(server): Introduce block num threshold
for early buffer flush to mitigate GC issues (#1759)
648931c46 is described below
commit 648931c46eef79a8c40ee470348f1946cf184f16
Author: RickyMa <[email protected]>
AuthorDate: Fri Jun 7 11:29:10 2024 +0800
[#1727] improvement(server): Introduce block num threshold for early buffer
flush to mitigate GC issues (#1759)
### What changes were proposed in this pull request?
Introduce block number threshold when flushing a single buffer, mitigating
GC/OOM issues from potential excessive small blocks.
### Why are the changes needed?
For: https://github.com/apache/incubator-uniffle/issues/1727.
In a production environment, the Uniffle server may run jobs with various
unreasonable configurations. These jobs might have a large number of partitions
(tens of thousands, hundreds of thousands, or even millions), or they might
have manually been configured with a very small spill size, or some other
reasons. This may ultimately bring a large number of small blocks to the
server, and the server has no choice but to maintain them in the heap memory
for a long time, simply because **_ [...]
In Netty mode, we use off-heap memory to store shuffle data. However, when
facing jobs with extremely unreasonable configurations, the total size of the
reference objects of the blocks maintained in the heap memory by the server may
even exceed the size of the data stored off-heap. This can bring great
instability to the server.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
docs/server_guide.md | 79 +++++++++++-----------
.../ShuffleServerConcurrentWriteOfHadoopTest.java | 2 +-
.../apache/uniffle/server/ShuffleServerConf.java | 14 +++-
.../server/buffer/ShuffleBufferManager.java | 18 ++++-
.../server/buffer/ShuffleBufferManagerTest.java | 6 +-
5 files changed, 73 insertions(+), 46 deletions(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index ec2d24fcc..4fecff93b 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -69,45 +69,46 @@ This document will introduce how to deploy Uniffle shuffle
servers.
```
## Configuration
-| Property Name | Default
| Description
[...]
-|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| rss.coordinator.quorum | -
| Coordinator quorum
[...]
-| rss.rpc.server.type | GRPC
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
-| rss.rpc.server.port | 19999
| RPC port for Shuffle
server, if set zero, grpc server start on random port.
[...]
-| rss.jetty.http.port | 19998
| Http port for Shuffle
server
[...]
-| rss.server.netty.port | -1
| Netty port for Shuffle
server, if set zero, Netty server start on random port.
[...]
-| rss.server.netty.epoll.enable | false
| Whether to enable epoll
model with Netty server.
[...]
-| rss.server.netty.accept.thread | 10
| Accept thread count in
netty.
[...]
-| rss.server.netty.worker.thread | 0
| Worker thread count in
netty. When set to 0, the default value is dynamically set to twice the number
of processor cores, but it will not be less than 100 to ensure the minimum
throughput of the service.
[...]
-| rss.server.netty.connect.backlog | 0
| For Netty server,
requested maximum length of the queue of incoming connections.
[...]
-| rss.server.netty.connect.timeout | 5000
| Timeout for connection
in netty.
[...]
-| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
-| rss.server.netty.send.buf | 0
| Send buffer size
(SO_SNDBUF).
[...]
-| rss.server.buffer.capacity | -1
| Max memory of buffer
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used
[...]
-| rss.server.buffer.capacity.ratio | 0.6
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
-| rss.server.memory.shuffle.highWaterMark.percentage | 75.0
| Threshold of spill data
to storage, percentage of rss.server.buffer.capacity
[...]
-| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0
| Threshold of keep data
in memory, percentage of rss.server.buffer.capacity
[...]
-| rss.server.read.buffer.capacity | -1
| Max size of buffer for
reading data. If negative, JVM heap size * read.buffer.ratio is used
[...]
-| rss.server.read.buffer.capacity.ratio | 0.2
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
-| rss.server.heartbeat.interval | 10000
| Heartbeat interval to
Coordinator (ms)
[...]
-| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000
| How often to collect
Netty pending tasks number metrics (in milliseconds)
[...]
-| rss.server.flush.localfile.threadPool.size | 10
| Thread pool for flush
data to local file
[...]
-| rss.server.flush.hadoop.threadPool.size | 60
| Thread pool for flush
data to hadoop storage
[...]
-| rss.server.commit.timeout | 600000
| Timeout when commit
shuffle data (ms)
[...]
-| rss.storage.type | -
| Supports
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
[...]
-| rss.server.flush.cold.storage.threshold.size | 64M
| The threshold of data
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used
[...]
-| rss.server.tags | -
| The comma-separated list
of tags to indicate the shuffle server's attributes. It will be used as the
assignment basis for the coordinator
[...]
-| rss.server.single.buffer.flush.enabled | true
| Whether single buffer
flush when size exceeded rss.server.single.buffer.flush.threshold
[...]
-| rss.server.single.buffer.flush.threshold | 128M
| The threshold of single
shuffle buffer flush
[...]
-| rss.server.disk.capacity | -1
| Disk capacity that
shuffle server can use. If negative, it will use disk whole space * ratio
[...]
-| rss.server.disk.capacity.ratio | 0.9
| When
`rss.server.disk.capacity` is negative, disk whole space * ratio is used
[...]
-| rss.server.hybrid.storage.fallback.strategy.class | -
| The fallback strategy
for `MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`.
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackS
[...]
-| rss.server.leak.shuffledata.check.interval | 3600000
| The interval of leak
shuffle data check (ms)
[...]
-| rss.server.max.concurrency.of.per-partition.write | 30
| The max concurrency of
single partition writer, the data partition file number is equal to this value.
Default value is 1. This config could improve the writing speed, especially for
huge partition.
[...]
-| rss.server.max.concurrency.limit.of.per-partition.write | -
| The limit for max
concurrency per-partition write specified by client, this won't be enabled by
default.
[...]
-| rss.metrics.reporter.class | -
| The class of metrics
reporter.
[...]
-| rss.server.hybrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
-| rss.server.disk-capacity.watermark.check.enabled | false
| If it is co-located with
other services, the high-low watermark check based on the uniffle used is not
correct. Due to this, the whole disk capacity watermark check is necessary,
which will reuse the current watermark value. It will be disabled by default.
[...]
+| Property Name | Default
| Description
[...]
+|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| rss.coordinator.quorum | -
| Coordinator quorum
[...]
+| rss.rpc.server.type | GRPC
| Shuffle server type,
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using
GRPC_NETTY to enable Netty on the server side for better stability and
performance.
[...]
+| rss.rpc.server.port | 19999
| RPC port for Shuffle
server, if set zero, grpc server start on random port.
[...]
+| rss.jetty.http.port | 19998
| Http port for Shuffle
server
[...]
+| rss.server.netty.port | -1
| Netty port for Shuffle
server, if set zero, Netty server start on random port.
[...]
+| rss.server.netty.epoll.enable | false
| Whether to enable epoll
model with Netty server.
[...]
+| rss.server.netty.accept.thread | 10
| Accept thread count in
netty.
[...]
+| rss.server.netty.worker.thread | 0
| Worker thread count in
netty. When set to 0, the default value is dynamically set to twice the number
of processor cores, but it will not be less than 100 to ensure the minimum
throughput of the service.
[...]
+| rss.server.netty.connect.backlog | 0
| For Netty server,
requested maximum length of the queue of incoming connections.
[...]
+| rss.server.netty.connect.timeout | 5000
| Timeout for connection
in netty.
[...]
+| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
+| rss.server.netty.send.buf | 0
| Send buffer size
(SO_SNDBUF).
[...]
+| rss.server.buffer.capacity | -1
| Max memory of buffer
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used
[...]
+| rss.server.buffer.capacity.ratio | 0.6
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
+| rss.server.memory.shuffle.highWaterMark.percentage | 75.0
| Threshold of spill data
to storage, percentage of rss.server.buffer.capacity
[...]
+| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0
| Threshold of keep data
in memory, percentage of rss.server.buffer.capacity
[...]
+| rss.server.read.buffer.capacity | -1
| Max size of buffer for
reading data. If negative, JVM heap size * read.buffer.ratio is used
[...]
+| rss.server.read.buffer.capacity.ratio | 0.2
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
+| rss.server.heartbeat.interval | 10000
| Heartbeat interval to
Coordinator (ms)
[...]
+| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000
| How often to collect
Netty pending tasks number metrics (in milliseconds)
[...]
+| rss.server.flush.localfile.threadPool.size | 10
| Thread pool for flush
data to local file
[...]
+| rss.server.flush.hadoop.threadPool.size | 60
| Thread pool for flush
data to hadoop storage
[...]
+| rss.server.commit.timeout | 600000
| Timeout when commit
shuffle data (ms)
[...]
+| rss.storage.type | -
| Supports
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS
[...]
+| rss.server.flush.cold.storage.threshold.size | 64M
| The threshold of data
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used
[...]
+| rss.server.tags | -
| The comma-separated list
of tags to indicate the shuffle server's attributes. It will be used as the
assignment basis for the coordinator
[...]
+| rss.server.single.buffer.flush.enabled | true
| Whether single buffer
flush when size exceeded rss.server.single.buffer.flush.threshold
[...]
+| rss.server.single.buffer.flush.threshold | 128M
| The threshold of single
shuffle buffer flush
[...]
+| rss.server.single.buffer.flush.blocksNumberThreshold | -
| The blocks number
threshold for triggering a flush for a single shuffle buffer. This threshold is
mainly used to control jobs with an excessive number of small blocks, allowing
these small blocks to be flushed as much as possible, rather than being
maintained in the heap and unable to be garbage collected. This can cause
severe garbage collection issues on [...]
+| rss.server.disk.capacity | -1
| Disk capacity that
shuffle server can use. If negative, it will use disk whole space * ratio
[...]
+| rss.server.disk.capacity.ratio | 0.9
| When
`rss.server.disk.capacity` is negative, disk whole space * ratio is used
[...]
+| rss.server.hybrid.storage.fallback.strategy.class | -
| The fallback strategy
for `MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`.
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackS
[...]
+| rss.server.leak.shuffledata.check.interval | 3600000
| The interval of leak
shuffle data check (ms)
[...]
+| rss.server.max.concurrency.of.per-partition.write | 30
| The max concurrency of
single partition writer, the data partition file number is equal to this value.
Default value is 1. This config could improve the writing speed, especially for
huge partition.
[...]
+| rss.server.max.concurrency.limit.of.per-partition.write | -
| The limit for max
concurrency per-partition write specified by client, this won't be enabled by
default.
[...]
+| rss.metrics.reporter.class | -
| The class of metrics
reporter.
[...]
+| rss.server.hybrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
+| rss.server.disk-capacity.watermark.check.enabled | false
| If it is co-located with
other services, the high-low watermark check based on the uniffle used is not
correct. Due to this, the whole disk capacity watermark check is necessary,
which will reuse the current watermark value. It will be disabled by default.
[...]
### Advanced Configurations
| Property Name | Default | Description
|
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index 7d5d11481..d4c41abff 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -88,7 +88,7 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends
ShuffleServerWithH
shuffleServerConf.setInteger(
ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION,
MAX_CONCURRENCY);
shuffleServerConf.setBoolean(shuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED,
true);
- shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD,
1024 * 1024L);
+
shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
1024 * 1024L);
return shuffleServerConf;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index fe3979012..064a6b28b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -387,12 +387,24 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription(
"Whether single buffer flush when size exceeded
rss.server.single.buffer.flush.threshold");
- public static final ConfigOption<Long> SINGLE_BUFFER_FLUSH_THRESHOLD =
+ public static final ConfigOption<Long> SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD =
ConfigOptions.key("rss.server.single.buffer.flush.threshold")
.longType()
.defaultValue(128 * 1024 * 1024L)
.withDescription("The threshold of single shuffle buffer flush");
+ public static final ConfigOption<Integer>
SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD =
+ ConfigOptions.key("rss.server.single.buffer.flush.blocksNumberThreshold")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription(
+ "The blocks number threshold for triggering a flush for a single
shuffle buffer. "
+ + "This threshold is mainly used to control jobs with an
excessive number of small blocks, "
+ + "allowing these small blocks to be flushed as much as
possible, "
+ + "rather than being maintained in the heap and unable to be
garbage collected. "
+ + "This can cause severe garbage collection issues on the
server side, and may even lead to out-of-heap-memory errors. "
+ + "If the threshold is set too high, it becomes meaningless.
It won't be enabled by default.");
+
public static final ConfigOption<Long>
SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL =
ConfigOptions.key("rss.server.leak.shuffledata.check.interval")
.longType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 6ec8c13e6..e85d2eae4 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -65,6 +65,7 @@ public class ShuffleBufferManager {
private long lowWaterMark;
private boolean bufferFlushEnabled;
private long bufferFlushThreshold;
+ private long bufferFlushBlocksNumThreshold;
// when shuffle buffer manager flushes data, shuffles with data size <
shuffleFlushThreshold is
// kept in memory to
// reduce small I/Os to persistent storage, especially for local HDDs.
@@ -124,7 +125,9 @@ public class ShuffleBufferManager {
*
conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
this.bufferFlushEnabled =
conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
this.bufferFlushThreshold =
- conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
+
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD);
+ this.bufferFlushBlocksNumThreshold =
+
conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD);
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
this.hugePartitionSizeThresholdRef =
@@ -266,7 +269,18 @@ public class ShuffleBufferManager {
// When we use multi storage and trigger single buffer flush, the buffer
size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
if ((isHugePartition || this.bufferFlushEnabled)
- && buffer.getSize() > this.bufferFlushThreshold) {
+ && (buffer.getSize() > this.bufferFlushThreshold
+ || buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Start to flush single buffer. Details - shuffleId:{},
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{},
blocksNum:{}",
+ shuffleId,
+ startPartition,
+ endPartition,
+ isHugePartition,
+ buffer.getSize(),
+ buffer.getBlocks().size());
+ }
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition,
isHugePartition);
return;
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 8e958b22b..007c36290 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -477,7 +477,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO,
0.1);
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD, 100L);
shuffleConf.set(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, false);
-
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD,
64L);
+
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
64L);
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
@@ -537,7 +537,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
80.0);
shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
1024L);
shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED,
true);
-
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD,
128L);
+
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
128L);
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
@@ -705,7 +705,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
80.0);
shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
1024L);
shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED,
true);
-
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD,
16L);
+
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD,
16L);
shuffleConf.setSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
16L);
shuffleConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE_HDFS.name());
