This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 27669080f [CELEBORN-925][TEST] Refine PartitionFilesSorterSuiteJ
27669080f is described below

commit 27669080fbfb009cbe9b0f3bce555561acfadb59
Author: Fu Chen <[email protected]>
AuthorDate: Tue Aug 29 18:16:02 2023 +0800

    [CELEBORN-925][TEST] Refine PartitionFilesSorterSuiteJ
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    This PR
    
    1. Strengthening assertion conditions.
    2. Enabling the previously ignored `testLargeFile` scenario.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1848 from cfmcgrady/refine-partition-files-sorter-suite.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/meta/FileInfo.java  |  2 +-
 .../worker/storage/PartitionFilesSorterSuiteJ.java | 98 ++++++++++++----------
 2 files changed, 55 insertions(+), 45 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 426f92850..3b9a540d2 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -114,7 +114,7 @@ public class FileInfo {
     return bytesFlushed;
   }
 
-  public long updateBytesFlushed(int numBytes) {
+  public long updateBytesFlushed(long numBytes) {
     bytesFlushed += numBytes;
     return bytesFlushed;
   }
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
index ab480065f..0c121eedc 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorterSuiteJ.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Random;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -40,6 +39,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.FileInfo;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.CelebornExitKind;
+import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.worker.WorkerSource;
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
@@ -48,18 +48,19 @@ public class PartitionFilesSorterSuiteJ {
 
   private static Logger logger = 
LoggerFactory.getLogger(PartitionFilesSorterSuiteJ.class);
 
+  private Random random = new Random();
   private File shuffleFile;
   private FileInfo fileInfo;
-  public final int CHUNK_SIZE = 8 * 1024 * 1024;
   private String originFileName;
   private long originFileLen;
   private FileWriter fileWriter;
-  private long sortTimeout = 16 * 1000;
   private UserIdentifier userIdentifier = new UserIdentifier("mock-tenantId", 
"mock-name");
 
-  public void prepare(boolean largefile) throws IOException {
+  private static final int MAX_MAP_ID = 50;
+
+  public long[] prepare(int mapCount) throws IOException {
+    long[] partitionSize = new long[MAX_MAP_ID];
     byte[] batchHeader = new byte[16];
-    Random random = new Random();
     shuffleFile = File.createTempFile("Celeborn", "sort-suite");
 
     originFileName = shuffleFile.getAbsolutePath();
@@ -68,13 +69,8 @@ public class PartitionFilesSorterSuiteJ {
     FileChannel channel = fileOutputStream.getChannel();
     Map<Integer, Integer> batchIds = new HashMap<>();
 
-    int maxMapId = 50;
-    int mapCount = 1000;
-    if (largefile) {
-      mapCount = 15000;
-    }
     for (int i = 0; i < mapCount; i++) {
-      int mapId = random.nextInt(maxMapId);
+      int mapId = random.nextInt(MAX_MAP_ID);
       int currentAttemptId = 0;
       int batchId =
           batchIds.compute(
@@ -87,6 +83,7 @@ public class PartitionFilesSorterSuiteJ {
                 }
                 return v;
               });
+      // [63.9k, 192k + 63.9k]
       int dataSize = random.nextInt(192 * 1024) + 65525;
       byte[] mockedData = new byte[dataSize];
       Platform.putInt(batchHeader, Platform.BYTE_ARRAY_OFFSET, mapId);
@@ -102,15 +99,12 @@ public class PartitionFilesSorterSuiteJ {
       while (buf2.hasRemaining()) {
         channel.write(buf2);
       }
+      partitionSize[mapId] = partitionSize[mapId] + batchHeader.length + 
mockedData.length;
     }
     originFileLen = channel.size();
     fileInfo.getChunkOffsets().add(originFileLen);
-    fileInfo.updateBytesFlushed((int) originFileLen);
-    System.out.println(
-        shuffleFile.getAbsolutePath()
-            + " filelen "
-            + (double) originFileLen / 1024 / 1024.0
-            + "MB");
+    fileInfo.updateBytesFlushed(originFileLen);
+    logger.info(shuffleFile.getAbsolutePath() + " filelen: " + 
Utils.bytesToString(originFileLen));
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), 
"0.8");
@@ -126,41 +120,57 @@ public class PartitionFilesSorterSuiteJ {
     fileWriter = Mockito.mock(FileWriter.class);
     when(fileWriter.getFile()).thenAnswer(i -> shuffleFile);
     when(fileWriter.getFileInfo()).thenAnswer(i -> fileInfo);
+    return partitionSize;
+  }
+
+  public void clean() throws IOException {
+    // origin file
+    JavaUtils.deleteRecursively(shuffleFile);
+    // sorted file
+    JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".sorted"));
+    // index file
+    JavaUtils.deleteRecursively(new File(shuffleFile.getPath() + ".index"));
   }
 
-  public void clean() {
-    shuffleFile.delete();
+  private void check(int mapCount, int startMapIndex, int endMapIndex) throws 
IOException {
+    try {
+      long[] partitionSize = prepare(mapCount);
+      CelebornConf conf = new CelebornConf();
+      conf.set(CelebornConf.SHUFFLE_CHUNK_SIZE().key(), "8m");
+      PartitionFilesSorter partitionFilesSorter =
+          new PartitionFilesSorter(MemoryManager.instance(), conf, new 
WorkerSource(conf));
+      FileInfo info =
+          partitionFilesSorter.getSortedFileInfo(
+              "application-1",
+              originFileName,
+              fileWriter.getFileInfo(),
+              startMapIndex,
+              endMapIndex);
+      long totalSizeToFetch = 0;
+      for (int i = startMapIndex; i < endMapIndex; i++) {
+        totalSizeToFetch += partitionSize[i];
+      }
+      long numChunks = totalSizeToFetch / conf.shuffleChunkSize() + 1;
+      Assert.assertTrue(0 < info.numChunks() && info.numChunks() <= numChunks);
+      long actualTotalChunkSize = info.getLastChunkOffset() - 
info.getChunkOffsets().get(0);
+      Assert.assertTrue(totalSizeToFetch == actualTotalChunkSize);
+    } finally {
+      clean();
+    }
   }
 
   @Test
-  public void testSmallFile() throws InterruptedException, IOException {
-    prepare(false);
-    CelebornConf conf = new CelebornConf();
-    PartitionFilesSorter partitionFilesSorter =
-        new PartitionFilesSorter(MemoryManager.instance(), conf, new 
WorkerSource(conf));
-    FileInfo info =
-        partitionFilesSorter.getSortedFileInfo(
-            "application-1", originFileName, fileWriter.getFileInfo(), 5, 10);
-    Thread.sleep(1000);
-    System.out.println(info.toString());
-    Assert.assertTrue(info.numChunks() > 0);
-    clean();
+  public void testSmallFile() throws IOException {
+    int startMapIndex = random.nextInt(5);
+    int endMapIndex = startMapIndex + random.nextInt(5) + 5;
+    check(1000, startMapIndex, endMapIndex);
   }
 
   @Test
-  @Ignore
-  public void testLargeFile() throws InterruptedException, IOException {
-    prepare(true);
-    CelebornConf conf = new CelebornConf();
-    PartitionFilesSorter partitionFilesSorter =
-        new PartitionFilesSorter(MemoryManager.instance(), conf, new 
WorkerSource(conf));
-    FileInfo info =
-        partitionFilesSorter.getSortedFileInfo(
-            "application-1", originFileName, fileWriter.getFileInfo(), 5, 10);
-    Thread.sleep(30000);
-    System.out.println(info.toString());
-    Assert.assertTrue(info.numChunks() > 0);
-    clean();
+  public void testLargeFile() throws IOException {
+    int startMapIndex = random.nextInt(5);
+    int endMapIndex = startMapIndex + random.nextInt(5) + 5;
+    check(15000, startMapIndex, endMapIndex);
   }
 
   @Test

Reply via email to