This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 27669080f [CELEBORN-925][TEST] Refine PartitionFilesSorterSuiteJ
27669080f is described below
commit 27669080fbfb009cbe9b0f3bce555561acfadb59
Author: Fu Chen <[email protected]>
AuthorDate: Tue Aug 29 18:16:02 2023 +0800
[CELEBORN-925][TEST] Refine PartitionFilesSorterSuiteJ
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR
1. Strengthening assertion conditions.
2. Enabling the previously ignored `testLargeFile` scenario.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1848 from cfmcgrady/refine-partition-files-sorter-suite.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/meta/FileInfo.java | 2 +-
.../worker/storage/PartitionFilesSorterSuiteJ.java | 98 ++++++++++++----------
2 files changed, 55 insertions(+), 45 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 426f92850..3b9a540d2 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -114,7 +114,7 @@ public class FileInfo {
return bytesFlushed;
}
- public long updateBytesFlushed(int numBytes) {
+ public long updateBytesFlushed(long numBytes) {
bytesFlushed += numBytes;
return bytesFlushed;
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
index ab480065f..0c121eedc 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Random;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -40,6 +39,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.FileInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.CelebornExitKind;
+import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
@@ -48,18 +48,19 @@ public class PartitionFilesSorterSuiteJ {
private static Logger logger =
LoggerFactory.getLogger(PartitionFilesSorterSuiteJ.class);
+ private Random random = new Random();
private File shuffleFile;
private FileInfo fileInfo;
- public final int CHUNK_SIZE = 8 * 1024 * 1024;
private String originFileName;
private long originFileLen;
private FileWriter fileWriter;
- private long sortTimeout = 16 * 1000;
private UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId",
"mock-name");
- public void prepare(boolean largefile) throws IOException {
+ private static final int MAX_MAP_ID = 50;
+
+ public long[] prepare(int mapCount) throws IOException {
+ long[] partitionSize = new long[MAX_MAP_ID];
byte[] batchHeader = new byte[16];
- Random random = new Random();
shuffleFile = File.createTempFile("Celeborn", "sort-suite");
originFileName = shuffleFile.getAbsolutePath();
@@ -68,13 +69,8 @@ public class PartitionFilesSorterSuiteJ {
FileChannel channel = fileOutputStream.getChannel();
Map<Integer, Integer> batchIds = new HashMap<>();
- int maxMapId = 50;
- int mapCount = 1000;
- if (largefile) {
- mapCount = 15000;
- }
for (int i = 0; i < mapCount; i++) {
- int mapId = random.nextInt(maxMapId);
+ int mapId = random.nextInt(MAX_MAP_ID);
int currentAttemptId = 0;
int batchId =
batchIds.compute(
@@ -87,6 +83,7 @@ public class PartitionFilesSorterSuiteJ {
}
return v;
});
+ // [63.9k, 192k + 63.9k]
int dataSize = random.nextInt(192 * 1024) + 65525;
byte[] mockedData = new byte[dataSize];
Platform.putInt(batchHeader, Platform.BYTE_ARRAY_OFFSET, mapId);
@@ -102,15 +99,12 @@ public class PartitionFilesSorterSuiteJ {
while (buf2.hasRemaining()) {
channel.write(buf2);
}
+ partitionSize[mapId] = partitionSize[mapId] + batchHeader.length +
mockedData.length;
}
originFileLen = channel.size();
fileInfo.getChunkOffsets().add(originFileLen);
- fileInfo.updateBytesFlushed((int) originFileLen);
- System.out.println(
- shuffleFile.getAbsolutePath()
- + " filelen "
- + (double) originFileLen / 1024 / 1024.0
- + "MB");
+ fileInfo.updateBytesFlushed(originFileLen);
+ logger.info(shuffleFile.getAbsolutePath() + " filelen: " +
Utils.bytesToString(originFileLen));
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
"0.8");
@@ -126,41 +120,57 @@ public class PartitionFilesSorterSuiteJ {
fileWriter = Mockito.mock(FileWriter.class);
when(fileWriter.getFile()).thenAnswer(i -> shuffleFile);
when(fileWriter.getFileInfo()).thenAnswer(i -> fileInfo);
+ return partitionSize;
+ }
+
+ public void clean() throws IOException {
+ // origin file
+ JavaUtils.deleteRecursively(shuffleFile);
+ // sorted file
+ JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".sorted"));
+ // index file
+ JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".index"));
}
- public void clean() {
- shuffleFile.delete();
+ private void check(int mapCount, int startMapIndex, int endMapIndex) throws
IOException {
+ try {
+ long[] partitionSize = prepare(mapCount);
+ CelebornConf conf = new CelebornConf();
+ conf.set(CelebornConf.SHUFFLE_CHUNK_SIZE().key(), "8m");
+ PartitionFilesSorter partitionFilesSorter =
+ new PartitionFilesSorter(MemoryManager.instance(), conf, new
WorkerSource(conf));
+ FileInfo info =
+ partitionFilesSorter.getSortedFileInfo(
+ "application-1",
+ originFileName,
+ fileWriter.getFileInfo(),
+ startMapIndex,
+ endMapIndex);
+ long totalSizeToFetch = 0;
+ for (int i = startMapIndex; i < endMapIndex; i++) {
+ totalSizeToFetch += partitionSize[i];
+ }
+ long numChunks = totalSizeToFetch / conf.shuffleChunkSize() + 1;
+ Assert.assertTrue(0 < info.numChunks() && info.numChunks() <= numChunks);
+ long actualTotalChunkSize = info.getLastChunkOffset() -
info.getChunkOffsets().get(0);
+ Assert.assertTrue(totalSizeToFetch == actualTotalChunkSize);
+ } finally {
+ clean();
+ }
}
@Test
- public void testSmallFile() throws InterruptedException, IOException {
- prepare(false);
- CelebornConf conf = new CelebornConf();
- PartitionFilesSorter partitionFilesSorter =
- new PartitionFilesSorter(MemoryManager.instance(), conf, new
WorkerSource(conf));
- FileInfo info =
- partitionFilesSorter.getSortedFileInfo(
- "application-1", originFileName, fileWriter.getFileInfo(), 5, 10);
- Thread.sleep(1000);
- System.out.println(info.toString());
- Assert.assertTrue(info.numChunks() > 0);
- clean();
+ public void testSmallFile() throws IOException {
+ int startMapIndex = random.nextInt(5);
+ int endMapIndex = startMapIndex + random.nextInt(5) + 5;
+ check(1000, startMapIndex, endMapIndex);
}
@Test
- @Ignore
- public void testLargeFile() throws InterruptedException, IOException {
- prepare(true);
- CelebornConf conf = new CelebornConf();
- PartitionFilesSorter partitionFilesSorter =
- new PartitionFilesSorter(MemoryManager.instance(), conf, new
WorkerSource(conf));
- FileInfo info =
- partitionFilesSorter.getSortedFileInfo(
- "application-1", originFileName, fileWriter.getFileInfo(), 5, 10);
- Thread.sleep(30000);
- System.out.println(info.toString());
- Assert.assertTrue(info.numChunks() > 0);
- clean();
+ public void testLargeFile() throws IOException {
+ int startMapIndex = random.nextInt(5);
+ int endMapIndex = startMapIndex + random.nextInt(5) + 5;
+ check(15000, startMapIndex, endMapIndex);
}
@Test