CodingCat commented on code in PR #2358:
URL: 
https://github.com/apache/incubator-celeborn/pull/2358#discussion_r1520370854


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,73 @@
 
 public class SortBasedPusher extends MemoryConsumer {
 
+  class MemoryThresholdManager {
+
+    private long maxMemoryThresholdInBytes;
+    private long currentMemoryThresholdInBytes;
+
+    private double smallPushTolerateFactor;
+
+    MemoryThresholdManager(
+        int numPartitions,
+        long sendBufferSizeInBytes,
+        long currentMemoryThresholdInBytes,
+        double smallPushTolerateFactor) {
+      this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+      this.currentMemoryThresholdInBytes = currentMemoryThresholdInBytes;
+      this.smallPushTolerateFactor = smallPushTolerateFactor;
+    }
+
+    private boolean shouldGrow() {
+      boolean enoughSpace = this.currentMemoryThresholdInBytes * 2 <= 
maxMemoryThresholdInBytes;
+      long shouldPushedBytes;
+      if (shouldPushedCount == 0) {
+        shouldPushedBytes = 0;
+      } else {
+        shouldPushedBytes =
+            (long) ((1 + smallPushTolerateFactor) * shouldPushedSizeInBytes / 
shouldPushedCount);
+      }
+      boolean tooManyPushed = pushedMemorySizeInBytes * 1.0 / pushedCount > 
shouldPushedBytes;

Review Comment:
   I updated the code, however, I think we still need to keep the tracking of 
shouldPushedSizeInBytes/shouldPushedCount, otherwise it will grow threshold 
unnecessarily (you can also check my unit test )
   
   e.g. if I have 32K buffer  and 256K data from the **same** partition, if we 
compare pushedBytes/pushedCounts with only buffer size (32K), we will still 
grow memory threshold since we would never be able to send exactly 32K for 
every push (because those extra bytes leading to that , at some point, we will 
have offset + record size > 32K), 
   
   However,  the pushes in this case are expected instead of led by too many 
small partitions (i.e. the case we want to avoid), even we use hashpartitioner, 
we still need to do such a push...we **SHOULD NOT** grow threshold in this case 
since it won't help for anything
   
   
   
   
   
    



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,73 @@
 
 public class SortBasedPusher extends MemoryConsumer {
 
+  class MemoryThresholdManager {
+
+    private long maxMemoryThresholdInBytes;
+    private long currentMemoryThresholdInBytes;
+
+    private double smallPushTolerateFactor;
+
+    MemoryThresholdManager(
+        int numPartitions,
+        long sendBufferSizeInBytes,
+        long currentMemoryThresholdInBytes,
+        double smallPushTolerateFactor) {
+      this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+      this.currentMemoryThresholdInBytes = currentMemoryThresholdInBytes;
+      this.smallPushTolerateFactor = smallPushTolerateFactor;
+    }
+
+    private boolean shouldGrow() {
+      boolean enoughSpace = this.currentMemoryThresholdInBytes * 2 <= 
maxMemoryThresholdInBytes;
+      long shouldPushedBytes;
+      if (shouldPushedCount == 0) {
+        shouldPushedBytes = 0;
+      } else {
+        shouldPushedBytes =
+            (long) ((1 + smallPushTolerateFactor) * shouldPushedSizeInBytes / 
shouldPushedCount);
+      }
+      boolean tooManyPushed = pushedMemorySizeInBytes * 1.0 / pushedCount > 
shouldPushedBytes;

Review Comment:
   I updated the code, however, I think we still need to keep the tracking of 
shouldPushedSizeInBytes/shouldPushedCount, otherwise it will grow threshold 
unnecessarily (you can also check my unit test )
   
   e.g. if I have 32K buffer  and 256K data from the **SAME** partition, if we 
compare pushedBytes/pushedCounts with only buffer size (32K), we will still 
grow memory threshold since we would never be able to send exactly 32K for 
every push (because those extra bytes leading to that , at some point, we will 
have offset + record size > 32K), 
   
   However,  the pushes in this case are expected instead of led by too many 
small partitions (i.e. the case we want to avoid), even we use hashpartitioner, 
we still need to do such a push...we **SHOULD NOT** grow threshold in this case 
since it won't help for anything
   
   
   
   
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to