CodingCat commented on code in PR #2358:
URL:
https://github.com/apache/incubator-celeborn/pull/2358#discussion_r1520868865
##########
client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriterSuiteJ.java:
##########
@@ -38,4 +66,145 @@ protected ShuffleWriter<Integer, String>
createShuffleWriter(
return new SortBasedShuffleWriter<Integer, String, String>(
handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60));
}
+
+ private SortBasedShuffleWriter<Integer, String, String>
createShuffleWriterWithPusher(
+ CelebornShuffleHandle handle,
+ TaskContext context,
+ CelebornConf conf,
+ ShuffleClient client,
+ ShuffleWriteMetricsReporter metrics,
+ SortBasedPusher pusher)
+ throws Exception {
+ return new SortBasedShuffleWriter<Integer, String, String>(
+ handle, context, conf, client, metrics, SendBufferPool.get(4, 30, 60),
pusher);
+ }
+
+ private SortBasedPusher createSortBasedPusher(
+ CelebornConf conf,
+ File tempFile,
+ int numPartitions,
+ ShuffleWriteMetricsReporter metricsReporter)
+ throws Exception {
+ SparkConf sparkConf = new SparkConf(false).set("spark.buffer.pageSize",
"32k");
+ UnifiedMemoryManager unifiedMemoryManager =
UnifiedMemoryManager.apply(sparkConf, 1);
+ TaskMemoryManager taskMemoryManager = new
TaskMemoryManager(unifiedMemoryManager, 0);
+
+ final ShuffleClient client = new DummyShuffleClient(conf, tempFile);
+
conf.set(CelebornConf.CLIENT_PUSH_SORT_USE_ADAPTIVE_MEMORY_THRESHOLD().key(),
"true");
+ LongAdder[] mapStatusLengths = new LongAdder[numPartitions];
+ for (int i = 0; i < numPartitions; i++) {
+ mapStatusLengths[i] = new LongAdder();
+ }
+ SortBasedPusher pusher =
+ new SortBasedPusher(
+ taskMemoryManager,
+ /*shuffleClient=*/ client,
+ /*taskContext=*/ taskContext,
+ /*shuffleId=*/ 0,
+ /*mapId=*/ 0,
+ /*attemptNumber=*/ 0,
+ /*taskAttemptId=*/ 0,
+ /*numMappers=*/ 0,
+ /*numPartitions=*/ numPartitions,
+ conf,
+ metricsReporter::incBytesWritten,
+ mapStatusLengths,
+ /*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("32K"),
+ SendBufferPool.get(4, 30, 60));
+ return pusher;
+ }
+
+ private String buildRecord(int size) {
+ char[] record = new char[size];
+ for (int i = 0; i < size; i++) {
+ record[i] = 'a';
+ }
+ return new String(record);
+ }
+
+ private Iterator<Product2<Integer, UnsafeRow>> getUnsafeRowIterator(
+ final int size, int recordSize, final AtomicInteger total, int
numPartitions) {
+ int current = 0;
+ ListBuffer<Product2<Integer, UnsafeRow>> list = new ListBuffer<>();
+ while (current < size) {
+ int key = total.getAndIncrement();
+ String value = buildRecord(recordSize);
+ current += value.length();
+ ListBuffer<Object> values = new ListBuffer<>();
+ values.$plus$eq(UTF8String.fromString(value));
+
+ InternalRow row = InternalRow.apply(values.toSeq());
+ DataType[] types = new DataType[1];
+ types[0] = StringType$.MODULE$;
+ UnsafeRow unsafeRow = UnsafeProjection.create(types).apply(row);
+
+ list.$plus$eq(new Tuple2<>(key % numPartitions, unsafeRow));
+ }
+ return list.toIterator();
+ }
+
+ @Test
+ public void testAdaptiveMemoryThreshold() throws Exception {
Review Comment:
it should work, what's our policy of supporting EOL Spark versions.
like "we should cover as many versions as possible" or "we should try to add
features to newer versions and facilitate the future drop the support of old
versions "?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]