SteNicholas commented on code in PR #2358:
URL:
https://github.com/apache/incubator-celeborn/pull/2358#discussion_r1520758528
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4018,6 +4022,32 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64m")
+ val CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.spark.push.sort.memory.useAdaptiveThreshold")
+ .withAlternative("celeborn.push.sortMemory.useAdaptiveThreshold")
+ .categories("client")
+ .doc("adaptively adjust threshold for sort shuffle writer's memory
threshold")
+ .version("0.5.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CLIENT_PUSH_SORT_SMALL_PUSH_TOLERATE_FACTOR: ConfigEntry[Double] =
+ buildConf("celeborn.client.spark.push.sort.smallPushTolerateFactor")
+ .withAlternative("celeborn.push.sortMemory.adaptiveThreshold")
+ .categories("client")
+ .doc("only be in effect when
celeborn.client.spark.push.sort.memory.adaptiveThreshold is" +
+ " turned on. It controls when to enlarge the sort shuffle writer's
memory threshold. With" +
+ " N bytes data in memory and V as the value of this config, if the
number of pushes, C," +
+ " when using sort based shuffle writer C >= (1 + V) * C' where C' is
the number of pushes" +
+ " if we were using hash based writer, we will enlarge the memory
threshold by 2X.")
+ .version("0.5.0")
+ .doubleConf
+ .checkValue(
+ v => v >= 0.0,
+ "the value of" +
Review Comment:
```suggestion
"Value must be no less than 0"
```
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,65 @@
public class SortBasedPusher extends MemoryConsumer {
+ class MemoryThresholdManager {
+
+ private long maxMemoryThresholdInBytes;
+ private double smallPushTolerateFactor;
+
+ private long sendBufferSizeInBytes;
Review Comment:
Ditto.
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,65 @@
public class SortBasedPusher extends MemoryConsumer {
+ class MemoryThresholdManager {
+
+ private long maxMemoryThresholdInBytes;
+ private double smallPushTolerateFactor;
Review Comment:
Ditto.
##########
client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java:
##########
@@ -38,4 +66,145 @@ protected ShuffleWriter<Integer, String>
createShuffleWriter(
return new SortBasedShuffleWriter<Integer, String, String>(
handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60));
}
+
+ private SortBasedShuffleWriter<Integer, String, String>
createShuffleWriterWithPusher(
+ CelebornShuffleHandle handle,
+ TaskContext context,
+ CelebornConf conf,
+ ShuffleClient client,
+ ShuffleWriteMetricsReporter metrics,
+ SortBasedPusher pusher)
+ throws Exception {
+ return new SortBasedShuffleWriter<Integer, String, String>(
+ handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60),
pusher);
+ }
+
+ private SortBasedPusher createSortBasedPusher(
+ CelebornConf conf,
+ File tempFile,
+ int numPartitions,
+ ShuffleWriteMetricsReporter metricsReporter)
+ throws Exception {
+ SparkConf sparkConf = new SparkConf(false).set("spark.buffer.pageSize",
"32k");
+ UnifiedMemoryManager unifiedMemoryManager =
UnifiedMemoryManager.apply(sparkConf, 1);
+ TaskMemoryManager taskMemoryManager = new
TaskMemoryManager(unifiedMemoryManager, 0);
+
+ final ShuffleClient client = new DummyShuffleClient(conf, tempFile);
+
conf.set(CelebornConf.CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD().key(),
"true");
+ LongAdder[] mapStatusLengths = new LongAdder[numPartitions];
+ for (int i = 0; i < numPartitions; i++) {
+ mapStatusLengths[i] = new LongAdder();
+ }
+ SortBasedPusher pusher =
+ new SortBasedPusher(
+ taskMemoryManager,
+ /*shuffleClient=*/ client,
+ /*taskContext=*/ taskContext,
+ /*shuffleId=*/ 0,
+ /*mapId=*/ 0,
+ /*attemptNumber=*/ 0,
+ /*taskAttemptId=*/ 0,
+ /*numMappers=*/ 0,
+ /*numPartitions=*/ numPartitions,
+ conf,
+ metricsReporter::incBytesWritten,
+ mapStatusLengths,
+ /*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("32K"),
+ SendBufferPool.get(4, 30, 60));
+ return pusher;
+ }
+
+ private String buildRecord(int size) {
+ char[] record = new char[size];
+ for (int i = 0; i < size; i++) {
+ record[i] = 'a';
+ }
+ return new String(record);
+ }
+
+ private Iterator<Product2<Integer, UnsafeRow>> getUnsafeRowIterator(
+ final int size, int recordSize, final AtomicInteger total, int
numPartitions) {
+ int current = 0;
+ ListBuffer<Product2<Integer, UnsafeRow>> list = new ListBuffer<>();
+ while (current < size) {
+ int key = total.getAndIncrement();
+ String value = buildRecord(recordSize);
+ current += value.length();
+ ListBuffer<Object> values = new ListBuffer<>();
+ values.$plus$eq(UTF8String.fromString(value));
+
+ InternalRow row = InternalRow.apply(values.toSeq());
+ DataType[] types = new DataType[1];
+ types[0] = StringType$.MODULE$;
+ UnsafeRow unsafeRow = UnsafeProjection.create(types).apply(row);
+
+ list.$plus$eq(new Tuple2<>(key % numPartitions, unsafeRow));
+ }
+ return list.toIterator();
+ }
+
+ @Test
+ public void testAdaptiveMemoryThreshold() throws Exception {
Review Comment:
Does this improvement work for Spark 2.4?
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4018,6 +4022,32 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64m")
+ val CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.spark.push.sort.memory.useAdaptiveThreshold")
+ .withAlternative("celeborn.push.sortMemory.useAdaptiveThreshold")
+ .categories("client")
+ .doc("adaptively adjust threshold for sort shuffle writer's memory
threshold")
Review Comment:
Keep the doc style that the first letter should be capitalized and a period
should be added at the end.
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4018,6 +4022,32 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64m")
+ val CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.spark.push.sort.memory.useAdaptiveThreshold")
+ .withAlternative("celeborn.push.sortMemory.useAdaptiveThreshold")
+ .categories("client")
+ .doc("adaptively adjust threshold for sort shuffle writer's memory
threshold")
+ .version("0.5.0")
+ .booleanConf
Review Comment:
Is the threshold numberic? IMO, the threshold should not be boolean.
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4018,6 +4022,32 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64m")
+ val CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.spark.push.sort.memory.useAdaptiveThreshold")
+ .withAlternative("celeborn.push.sortMemory.useAdaptiveThreshold")
+ .categories("client")
+ .doc("adaptively adjust threshold for sort shuffle writer's memory
threshold")
+ .version("0.5.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CLIENT_PUSH_SORT_SMALL_PUSH_TOLERATE_FACTOR: ConfigEntry[Double] =
+ buildConf("celeborn.client.spark.push.sort.smallPushTolerateFactor")
+ .withAlternative("celeborn.push.sortMemory.adaptiveThreshold")
+ .categories("client")
+ .doc("only be in effect when
celeborn.client.spark.push.sort.memory.adaptiveThreshold is" +
Review Comment:
From a user perspective, based on this configuration description, the user
has no way of knowing how to configure this factor.
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,65 @@
public class SortBasedPusher extends MemoryConsumer {
+ class MemoryThresholdManager {
+
+ private long maxMemoryThresholdInBytes;
Review Comment:
```suggestion
private final long maxMemoryThresholdInBytes;
```
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java:
##########
@@ -44,6 +44,65 @@
public class SortBasedPusher extends MemoryConsumer {
+ class MemoryThresholdManager {
+
+ private long maxMemoryThresholdInBytes;
+ private double smallPushTolerateFactor;
+
+ private long sendBufferSizeInBytes;
+
+ MemoryThresholdManager(
+ int numPartitions, long sendBufferSizeInBytes, double
smallPushTolerateFactor) {
+ this.maxMemoryThresholdInBytes = numPartitions * sendBufferSizeInBytes;
+ this.smallPushTolerateFactor = smallPushTolerateFactor;
+ this.sendBufferSizeInBytes = sendBufferSizeInBytes;
+ }
+
+ private boolean shouldGrow() {
+ boolean enoughSpace = pushSortMemoryThreshold * 2 <=
maxMemoryThresholdInBytes;
+ double expectedPushSize = Long.MAX_VALUE;
+ if (this.expectedPushedCount != 0) {
+ expectedPushSize = this.expectedPushedBytes * 1.0 /
this.expectedPushedCount;
+ }
+ boolean tooManyPushed =
+ pushedMemorySizeInBytes * 1.0 / pushedCount * (1 +
this.smallPushTolerateFactor)
+ < expectedPushSize;
+ return enoughSpace && tooManyPushed;
+ }
+
+ public void growThresholdIfNeeded() {
+ if (shouldGrow()) {
+ long oldThreshold = pushSortMemoryThreshold;
+ pushSortMemoryThreshold = pushSortMemoryThreshold * 2;
+ logger.info(
Review Comment:
Use `Utils` to format the threshold.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]