This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d149d579 [#1064] improvement(tez): Make shuffle data send thread pool 
configurable in WriteBufferManager. (#1065)
d149d579 is described below

commit d149d5799a7e13bf5562e1a40354221cd505a826
Author: Fantasy-Jay <[email protected]>
AuthorDate: Wed Aug 2 11:18:52 2023 +0800

    [#1064] improvement(tez): Make shuffle data send thread pool configurable 
in WriteBufferManager. (#1065)
    
    ### What changes were proposed in this pull request?
    Currently the number of threads in the send data thread pool in 
WriteBufferManager is fixed at 1, making it configurable.
    
    ### Why are the changes needed?
    Fix: #[1064](https://github.com/apache/incubator-uniffle/issues/1064)
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests can cover it.
    
    Co-authored-by: jay.zhu <[email protected]>
---
 client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java    | 4 ++++
 .../tez/runtime/library/common/sort/buffer/WriteBufferManager.java  | 3 ++-
 .../org/apache/tez/runtime/library/common/sort/impl/RssSorter.java  | 5 +++++
 .../apache/tez/runtime/library/common/sort/impl/RssUnSorter.java    | 5 +++++
 .../runtime/library/common/sort/buffer/WriteBufferManagerTest.java  | 6 ++++++
 5 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java 
b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index f9111e04..52f8cd36 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -96,6 +96,10 @@ public class RssTezConfig {
   public static final String RSS_CLIENT_MEMORY_THRESHOLD =
       TEZ_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
   public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
+  public static final String RSS_CLIENT_SEND_THREAD_NUM =
+      TEZ_RSS_CONFIG_PREFIX + "rss.client.send.thread.num";
+  public static final int RSS_CLIENT_DEFAULT_THREAD_NUM =
+      RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM;
   public static final String RSS_CLIENT_SEND_THRESHOLD =
       TEZ_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
   public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 748e01d9..06e4194b 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -113,6 +113,7 @@ public class WriteBufferManager<K, V> {
       Serializer<V> valSerializer,
       long maxBufferSize,
       double memoryThreshold,
+      int sendThreadNum,
       double sendThreshold,
       int batch,
       RssConf rssConf,
@@ -152,7 +153,7 @@ public class WriteBufferManager<K, V> {
     this.isNeedSorted = isNeedSorted;
     this.mapOutputByteCounter = mapOutputByteCounter;
     this.sendExecutorService =
-        Executors.newFixedThreadPool(1, 
ThreadUtils.getThreadFactory("send-thread"));
+        Executors.newFixedThreadPool(sendThreadNum, 
ThreadUtils.getThreadFactory("send-thread"));
   }
 
   /** add record */
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index ff1c76fc..c937770e 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -94,6 +94,9 @@ public class RssSorter extends ExternalSorter {
         conf.getDouble(
             RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
             RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
+    int sendThreadNum =
+        conf.getInt(
+            RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM, 
RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
     double sendThreshold =
         conf.getDouble(
             RssTezConfig.RSS_CLIENT_SEND_THRESHOLD, 
RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
@@ -125,6 +128,7 @@ public class RssSorter extends ExternalSorter {
       LOG.info("maxSegmentSize is {}", maxSegmentSize);
       LOG.info("maxBufferSize is {}", maxBufferSize);
       LOG.info("memoryThreshold is {}", memoryThreshold);
+      LOG.info("sendThreadNum is {}", sendThreadNum);
       LOG.info("sendThreshold is {}", sendThreshold);
       LOG.info("batch is {}", batch);
       LOG.info("storageType is {}", storageType);
@@ -150,6 +154,7 @@ public class RssSorter extends ExternalSorter {
             valSerializer,
             maxBufferSize,
             memoryThreshold,
+            sendThreadNum,
             sendThreshold,
             batch,
             new RssConf(),
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index 374908c1..b2073fe9 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -92,6 +92,9 @@ public class RssUnSorter extends ExternalSorter {
         conf.getDouble(
             RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
             RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
+    int sendThreadNum =
+        conf.getInt(
+            RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM, 
RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
     double sendThreshold =
         conf.getDouble(
             RssTezConfig.RSS_CLIENT_SEND_THRESHOLD, 
RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
@@ -123,6 +126,7 @@ public class RssUnSorter extends ExternalSorter {
       LOG.info("maxSegmentSize is {}", maxSegmentSize);
       LOG.info("maxBufferSize is {}", maxBufferSize);
       LOG.info("memoryThreshold is {}", memoryThreshold);
+      LOG.info("sendThreadNum is {}", sendThreadNum);
       LOG.info("sendThreshold is {}", sendThreshold);
       LOG.info("batch is {}", batch);
       LOG.info("storageType is {}", storageType);
@@ -148,6 +152,7 @@ public class RssUnSorter extends ExternalSorter {
             valSerializer,
             maxBufferSize,
             memoryThreshold,
+            sendThreadNum,
             sendThreshold,
             batch,
             new RssConf(),
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
index 974b92cb..8f2af65b 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java
@@ -89,6 +89,7 @@ public class WriteBufferManagerTest {
         serializationFactory.getSerializer(BytesWritable.class);
     long maxBufferSize = 14 * 1024 * 1024;
     double memoryThreshold = 0.8f;
+    int sendThreadNum = 1;
     double sendThreshold = 0.2f;
     int batch = 50;
     int numMaps = 1;
@@ -132,6 +133,7 @@ public class WriteBufferManagerTest {
             valSerializer,
             maxBufferSize,
             memoryThreshold,
+            sendThreadNum,
             sendThreshold,
             batch,
             rssConf,
@@ -185,6 +187,7 @@ public class WriteBufferManagerTest {
         serializationFactory.getSerializer(BytesWritable.class);
     long maxBufferSize = 14 * 1024 * 1024;
     double memoryThreshold = 0.8f;
+    int sendThreadNum = 1;
     double sendThreshold = 0.2f;
     int batch = 50;
     int numMaps = 1;
@@ -228,6 +231,7 @@ public class WriteBufferManagerTest {
             valSerializer,
             maxBufferSize,
             memoryThreshold,
+            sendThreadNum,
             sendThreshold,
             batch,
             rssConf,
@@ -292,6 +296,7 @@ public class WriteBufferManagerTest {
         serializationFactory.getSerializer(BytesWritable.class);
     long maxBufferSize = 14 * 1024 * 1024;
     double memoryThreshold = 0.8f;
+    int sendThreadNum = 1;
     double sendThreshold = 0.2f;
     int batch = 50;
     int numMaps = 1;
@@ -334,6 +339,7 @@ public class WriteBufferManagerTest {
             valSerializer,
             maxBufferSize,
             memoryThreshold,
+            sendThreadNum,
             sendThreshold,
             batch,
             rssConf,

Reply via email to