waitinfuture commented on code in PR #2358:
URL: 
https://github.com/apache/incubator-celeborn/pull/2358#discussion_r1519728678


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,73 @@
 
 public class SortBasedPusher extends MemoryConsumer {
 
+  class MemoryThresholdManager {
+
+    private long maxMemoryThresholdInBytes;
+    private long currentMemoryThresholdInBytes;
+
+    private double smallPushTolerateFactor;
+
+    MemoryThresholdManager(
+        int numPartitions,
+        long sendBufferSizeInBytes,
+        long currentMemoryThresholdInBytes,
+        double smallPushTolerateFactor) {
+      this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+      this.currentMemoryThresholdInBytes = currentMemoryThresholdInBytes;
+      this.smallPushTolerateFactor = smallPushTolerateFactor;
+    }
+
+    private boolean shouldGrow() {
+      boolean enoughSpace = this.currentMemoryThresholdInBytes * 2 <= 
maxMemoryThresholdInBytes;
+      long shouldPushedBytes;
+      if (shouldPushedCount == 0) {
+        shouldPushedBytes = 0;
+      } else {
+        shouldPushedBytes =
+            (long) ((1 + smallPushTolerateFactor) * shouldPushedSizeInBytes / 
shouldPushedCount);
+      }
+      boolean tooManyPushed = pushedMemorySizeInBytes * 1.0 / pushedCount > 
shouldPushedBytes;

Review Comment:
   `pushedMemorySizeInBytes * 1.0 / pushedCount` calculates average size of all 
pushes; `shouldPushedSizeInBytes / shouldPushedCount` calculates averages size 
of pushes where the size is close or equal to `pushBufferMaxSize`, so the later 
is almost aways bigger than the former, let alone multiply the later with (1 + 
`smallPushTolerateFactor`).
   
   Maybe we can simplify the logic to check whether average size of all pushes 
is smaller than `pushBufferMaxSize / (1 + factor)` and grow if yes, and we 
don't need `shouldPushedSizeInBytes/shouldPushedCount`.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -697,6 +697,10 @@ private[deploy] class Controller(
       val releaseReplicaLocations =
         partitionLocationInfo.removeReplicaPartitions(shuffleKey, 
replicaLocations)
       workerInfo.releaseSlots(shuffleKey, releaseReplicaLocations._1)
+      workerSource.incCounter(
+        WorkerSource.SLOTS_ALLOCATED,
+        -(primaryLocations.size() + replicaLocations.size() - 
failedPrimaries.size()

Review Comment:
   I'm not sure if we should decrease `SLOTS_ALLOCATED`. Currently this metric 
only increases. If we decrease here, should we also decrease everywhere 
`releaseSlots` is called? cc @pan3793 @cfmcgrady @AngersZhuuuu 



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -218,7 +307,12 @@ public boolean insertRecord(
       required = recordSize + UAO_SIZE;
     }
 
-    if (getUsed() > pushSortMemoryThreshold
+    long threshold =

Review Comment:
   We can set the actual threshold when `growThresholdIfNeeded()` is called and 
returns true instead of checking for each insertion.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -132,14 +207,22 @@ public SortBasedPusher(
     }
 
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
+    adaptiveThreshold = conf.clientPushSortMemoryAdaptiveThreshold();

Review Comment:
   Better to rename to `useAdaptiveThreshold`



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,73 @@
 
 public class SortBasedPusher extends MemoryConsumer {
 
+  class MemoryThresholdManager {
+
+    private long maxMemoryThresholdInBytes;
+    private long currentMemoryThresholdInBytes;
+
+    private double smallPushTolerateFactor;
+
+    MemoryThresholdManager(
+        int numPartitions,
+        long sendBufferSizeInBytes,
+        long currentMemoryThresholdInBytes,
+        double smallPushTolerateFactor) {
+      this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+      this.currentMemoryThresholdInBytes = currentMemoryThresholdInBytes;
+      this.smallPushTolerateFactor = smallPushTolerateFactor;
+    }
+
+    private boolean shouldGrow() {
+      boolean enoughSpace = this.currentMemoryThresholdInBytes * 2 <= 
maxMemoryThresholdInBytes;
+      long shouldPushedBytes;
+      if (shouldPushedCount == 0) {
+        shouldPushedBytes = 0;
+      } else {
+        shouldPushedBytes =
+            (long) ((1 + smallPushTolerateFactor) * shouldPushedSizeInBytes / 
shouldPushedCount);
+      }
+      boolean tooManyPushed = pushedMemorySizeInBytes * 1.0 / pushedCount > 
shouldPushedBytes;
+      return enoughSpace && tooManyPushed;
+    }
+
+    public void growThresholdIfNeeded() {
+      if (shouldGrow()) {
+        long oldThreshold = this.currentMemoryThresholdInBytes;
+        this.currentMemoryThresholdInBytes = 
this.currentMemoryThresholdInBytes * 2;
+        logger.info(
+            "grow memory threshold from "
+                + oldThreshold / 1024 / 1024
+                + "Mb to "
+                + this.currentMemoryThresholdInBytes / 1024 / 1024
+                + " Mb");
+        pushedCount = 0;
+        pushedMemorySizeInBytes = 0;
+        shouldPushedCount = 0;
+        shouldPushedSizeInBytes = 0;
+      }
+    }
+
+    public long getCurrentMemoryThresholdInBytes() {
+      return currentMemoryThresholdInBytes;
+    }
+
+    long pushedCount = 0;
+    long pushedMemorySizeInBytes = 0;
+
+    long shouldPushedCount = 0;
+    long shouldPushedSizeInBytes = 0;
+
+    public void updateStats(long pushedBytes, boolean updateExpected) {
+      memoryThresholdManager.pushedMemorySizeInBytes += pushedBytes;

Review Comment:
   Seems here can use `this` instead of `memoryThresholdManager`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to