rickyma commented on code in PR #1670:
URL: 
https://github.com/apache/incubator-uniffle/pull/1670#discussion_r1582494284


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -292,20 +298,48 @@ public List<ShuffleBlockInfo> addRecord(int partitionId, 
Object key, Object valu
   }
 
   // transform all [partition, records] to [partition, ShuffleBlockInfo] and 
clear cache
-  public synchronized List<ShuffleBlockInfo> clear() {
+  public synchronized List<ShuffleBlockInfo> clear(double bufferSpillRatio) {
     List<ShuffleBlockInfo> result = Lists.newArrayList();
     long dataSize = 0;
     long memoryUsed = 0;
-    Iterator<Entry<Integer, WriterBuffer>> iterator = 
buffers.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<Integer, WriterBuffer> entry = iterator.next();
-      WriterBuffer wb = entry.getValue();
+    bufferSpillRatio = Math.max(0.1, Math.min(1.0, bufferSpillRatio));
+    List<Integer> partitionList =
+        new ArrayList<Integer>() {
+          {
+            addAll(buffers.keySet());

Review Comment:
   I think this can be simplified as:
   ```
   List<Integer> partitionList = new ArrayList<>(buffers.keySet());
   ```



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -292,20 +298,48 @@ public List<ShuffleBlockInfo> addRecord(int partitionId, 
Object key, Object valu
   }
 
   // transform all [partition, records] to [partition, ShuffleBlockInfo] and 
clear cache
-  public synchronized List<ShuffleBlockInfo> clear() {
+  public synchronized List<ShuffleBlockInfo> clear(double bufferSpillRatio) {
     List<ShuffleBlockInfo> result = Lists.newArrayList();
     long dataSize = 0;
     long memoryUsed = 0;
-    Iterator<Entry<Integer, WriterBuffer>> iterator = 
buffers.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<Integer, WriterBuffer> entry = iterator.next();
-      WriterBuffer wb = entry.getValue();
+    bufferSpillRatio = Math.max(0.1, Math.min(1.0, bufferSpillRatio));
+    List<Integer> partitionList =
+        new ArrayList<Integer>() {
+          {
+            addAll(buffers.keySet());
+          }
+        };
+    if (bufferSpillRatio < 1.0) {
+      Collections.sort(

Review Comment:
   This can be simplified as:
   ```
   partitionList.sort(Comparator.comparingInt(o -> buffers.get(o) == null ? 0 : 
buffers.get(o).getMemoryUsed()).reversed());
   ```
   



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() {
             + dataSize
             + "], memoryUsed["
             + memoryUsed
+            + "],number of blocks["
+            + result.size()
+            + "],flush ratio["

Review Comment:
   Nit, add a space here, for a better log output:
   ```],flush ratio[ ``` -> ```], flush ratio[```



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -179,7 +180,12 @@ public List<ShuffleBlockInfo> addPartitionData(
 
     // check buffer size > spill threshold
     if (usedBytes.get() - inSendListBytes.get() > spillSize) {
-      List<ShuffleBlockInfo> multiSendingBlocks = clear();
+      LOG.info(
+          String.format(
+              "ShuffleBufferManager spill for buffer size exceeding spill 
threshold,"
+                  + "usedBytes[%d],inSendListBytes[%d],spill size 
threshold[%d]",

Review Comment:
   Nit, a better log output:
   ```
   " usedBytes[%d], inSendListBytes[%d], spill size threshold[%d]",
   ```



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() {
             + dataSize
             + "], memoryUsed["
             + memoryUsed
+            + "],number of blocks["

Review Comment:
   Nit, add a space here, for a better log output:
   ```],number of blocks[ ``` -> ```], number of blocks[```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to