This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d149d579 [#1064] improvement(tez): Make shuffle data send thread pool
configurable in WriteBufferManager. (#1065)
d149d579 is described below
commit d149d5799a7e13bf5562e1a40354221cd505a826
Author: Fantasy-Jay <[email protected]>
AuthorDate: Wed Aug 2 11:18:52 2023 +0800
[#1064] improvement(tez): Make shuffle data send thread pool configurable
in WriteBufferManager. (#1065)
### What changes were proposed in this pull request?
Currently the number of threads in the send data thread pool in
WriteBufferManager is fixed at 1, making it configurable.
### Why are the changes needed?
Fix: #[1064](https://github.com/apache/incubator-uniffle/issues/1064)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests can cover it.
Co-authored-by: jay.zhu <[email protected]>
---
client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java | 4 ++++
.../tez/runtime/library/common/sort/buffer/WriteBufferManager.java | 3 ++-
.../org/apache/tez/runtime/library/common/sort/impl/RssSorter.java | 5 +++++
.../apache/tez/runtime/library/common/sort/impl/RssUnSorter.java | 5 +++++
.../runtime/library/common/sort/buffer/WriteBufferManagerTest.java | 6 ++++++
5 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index f9111e04..52f8cd36 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -96,6 +96,10 @@ public class RssTezConfig {
public static final String RSS_CLIENT_MEMORY_THRESHOLD =
TEZ_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
+ public static final String RSS_CLIENT_SEND_THREAD_NUM =
+ TEZ_RSS_CONFIG_PREFIX + "rss.client.send.thread.num";
+ public static final int RSS_CLIENT_DEFAULT_THREAD_NUM =
+ RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM;
public static final String RSS_CLIENT_SEND_THRESHOLD =
TEZ_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 748e01d9..06e4194b 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -113,6 +113,7 @@ public class WriteBufferManager<K, V> {
Serializer<V> valSerializer,
long maxBufferSize,
double memoryThreshold,
+ int sendThreadNum,
double sendThreshold,
int batch,
RssConf rssConf,
@@ -152,7 +153,7 @@ public class WriteBufferManager<K, V> {
this.isNeedSorted = isNeedSorted;
this.mapOutputByteCounter = mapOutputByteCounter;
this.sendExecutorService =
- Executors.newFixedThreadPool(1,
ThreadUtils.getThreadFactory("send-thread"));
+ Executors.newFixedThreadPool(sendThreadNum,
ThreadUtils.getThreadFactory("send-thread"));
}
/** add record */
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index ff1c76fc..c937770e 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -94,6 +94,9 @@ public class RssSorter extends ExternalSorter {
conf.getDouble(
RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
+ int sendThreadNum =
+ conf.getInt(
+ RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM,
RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
double sendThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_SEND_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
@@ -125,6 +128,7 @@ public class RssSorter extends ExternalSorter {
LOG.info("maxSegmentSize is {}", maxSegmentSize);
LOG.info("maxBufferSize is {}", maxBufferSize);
LOG.info("memoryThreshold is {}", memoryThreshold);
+ LOG.info("sendThreadNum is {}", sendThreadNum);
LOG.info("sendThreshold is {}", sendThreshold);
LOG.info("batch is {}", batch);
LOG.info("storageType is {}", storageType);
@@ -150,6 +154,7 @@ public class RssSorter extends ExternalSorter {
valSerializer,
maxBufferSize,
memoryThreshold,
+ sendThreadNum,
sendThreshold,
batch,
new RssConf(),
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index 374908c1..b2073fe9 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -92,6 +92,9 @@ public class RssUnSorter extends ExternalSorter {
conf.getDouble(
RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
+ int sendThreadNum =
+ conf.getInt(
+ RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM,
RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
double sendThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_SEND_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
@@ -123,6 +126,7 @@ public class RssUnSorter extends ExternalSorter {
LOG.info("maxSegmentSize is {}", maxSegmentSize);
LOG.info("maxBufferSize is {}", maxBufferSize);
LOG.info("memoryThreshold is {}", memoryThreshold);
+ LOG.info("sendThreadNum is {}", sendThreadNum);
LOG.info("sendThreshold is {}", sendThreshold);
LOG.info("batch is {}", batch);
LOG.info("storageType is {}", storageType);
@@ -148,6 +152,7 @@ public class RssUnSorter extends ExternalSorter {
valSerializer,
maxBufferSize,
memoryThreshold,
+ sendThreadNum,
sendThreshold,
batch,
new RssConf(),
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 974b92cb..8f2af65b 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -89,6 +89,7 @@ public class WriteBufferManagerTest {
serializationFactory.getSerializer(BytesWritable.class);
long maxBufferSize = 14 * 1024 * 1024;
double memoryThreshold = 0.8f;
+ int sendThreadNum = 1;
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
@@ -132,6 +133,7 @@ public class WriteBufferManagerTest {
valSerializer,
maxBufferSize,
memoryThreshold,
+ sendThreadNum,
sendThreshold,
batch,
rssConf,
@@ -185,6 +187,7 @@ public class WriteBufferManagerTest {
serializationFactory.getSerializer(BytesWritable.class);
long maxBufferSize = 14 * 1024 * 1024;
double memoryThreshold = 0.8f;
+ int sendThreadNum = 1;
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
@@ -228,6 +231,7 @@ public class WriteBufferManagerTest {
valSerializer,
maxBufferSize,
memoryThreshold,
+ sendThreadNum,
sendThreshold,
batch,
rssConf,
@@ -292,6 +296,7 @@ public class WriteBufferManagerTest {
serializationFactory.getSerializer(BytesWritable.class);
long maxBufferSize = 14 * 1024 * 1024;
double memoryThreshold = 0.8f;
+ int sendThreadNum = 1;
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
@@ -334,6 +339,7 @@ public class WriteBufferManagerTest {
valSerializer,
maxBufferSize,
memoryThreshold,
+ sendThreadNum,
sendThreshold,
batch,
rssConf,