This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f869ab25b [CELEBORN-857][TEST] Refine DataPushQueueSuiteJ
f869ab25b is described below

commit f869ab25b65d9fe10be1a43e70e6c59d7412a492
Author: Fu Chen <[email protected]>
AuthorDate: Mon Jul 31 15:43:43 2023 +0800

    [CELEBORN-857][TEST] Refine DataPushQueueSuiteJ
    
    ### What changes were proposed in this pull request?
    
    1. This PR propose renaming the class `DataPushQueueSuitJ` to 
`DataPushQueueSuiteJ` in order to enable its integration with the test suite. 
This change is required to comply with our maven-surefire-plugin plugin rule
    
    
https://github.com/apache/incubator-celeborn/blob/5f0295e9f3f1f5781af124ae319e202a2594a103/pom.xml#L543-L551
    
    2. To fix a potential logic bug in the test, tasks within `DataPushQueue` 
may inadvertently be consumed by the `DataPusher`s built-in thread 
`DataPusher-${taskId}`, leading to test suite failures.
    
    ![截屏2023-07-31 下午12 08 
06](https://github.com/apache/incubator-celeborn/assets/8537877/b7a294a5-a12b-474a-b43d-233998bc7f31)
    
    ![截屏2023-07-31 下午12 07 
49](https://github.com/apache/incubator-celeborn/assets/8537877/c585ed00-0111-4aab-863a-e7984ed8a298)
    
    ### Why are the changes needed?
    
    fix DataPushQueueSuiteJ bug
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1774 from cfmcgrady/refine-data-push-queue-suite.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/write/DataPusher.java   |  2 +-
 ...ushQueueSuitJ.java => DataPushQueueSuiteJ.java} | 53 ++++++++--------------
 2 files changed, 20 insertions(+), 35 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java 
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index 5444e31f4..a4932674e 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -193,7 +193,7 @@ public class DataPusher {
     }
   }
 
-  private void pushData(PushTask task) throws IOException {
+  protected void pushData(PushTask task) throws IOException {
     int bytesWritten =
         client.pushData(
             shuffleId,
diff --git 
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuitJ.java 
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
similarity index 80%
rename from 
client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuitJ.java
rename to 
client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index bbd432bc8..b2c0d8b44 100644
--- 
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuitJ.java
+++ 
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.junit.AfterClass;
@@ -41,8 +40,8 @@ import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.common.write.PushState;
 
-public class DataPushQueueSuitJ {
-  private static final Logger LOG = 
LoggerFactory.getLogger(DataPushQueueSuitJ.class);
+public class DataPushQueueSuiteJ {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataPushQueueSuiteJ.class);
   private static File tempDir = null;
 
   private final int shuffleId = 0;
@@ -95,6 +94,11 @@ public class DataPushQueueSuitJ {
     for (int i = 0; i < numPartitions; i++) {
       mapStatusLengths[i] = new LongAdder();
     }
+    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+    PushState pushState = client.getPushState(mapKey);
+    Map<Integer, PartitionLocation> reducePartitionMap =
+        client.getReducePartitionMap().get(shuffleId);
+
     DataPusher dataPusher =
         new DataPusher(
             shuffleId,
@@ -107,13 +111,17 @@ public class DataPushQueueSuitJ {
             client,
             null,
             integer -> {},
-            mapStatusLengths);
-
-    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
-    DataPushQueue dataPushQueue = dataPusher.getDataPushQueue();
-    PushState pushState = client.getPushState(mapKey);
-    Map<Integer, PartitionLocation> reducePartitionMap =
-        client.getReducePartitionMap().get(shuffleId);
+            mapStatusLengths) {
+          @Override
+          protected void pushData(PushTask task) throws IOException {
+            byte[] buffer = task.getBuffer();
+            int partitionId = task.getPartitionId();
+            tarWorkerData.get(partitionId % numWorker).add(bytesToInt(buffer));
+            pushState.removeBatch(
+                partitionBatchIdMap.get(partitionId),
+                reducePartitionMap.get(partitionId).hostAndPushPort());
+          }
+        };
 
     for (int i = 0; i < numPartitions; i++) {
       byte[] b = intToBytes(workerData.get(i % numWorker).get(i / numWorker));
@@ -123,30 +131,7 @@ public class DataPushQueueSuitJ {
       partitionBatchIdMap.put(i, batchId);
     }
 
-    AtomicBoolean running = new AtomicBoolean(true);
-    new Thread(
-            () -> {
-              while (running.get()) {
-                try {
-                  ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
-                  for (int i = 0; i < tasks.size(); i++) {
-                    PushTask task = tasks.get(i);
-                    byte[] buffer = task.getBuffer();
-                    int partitionId = task.getPartitionId();
-                    tarWorkerData.get(partitionId % 
numWorker).add(bytesToInt(buffer));
-                    pushState.removeBatch(
-                        partitionBatchIdMap.get(partitionId),
-                        reducePartitionMap.get(partitionId).hostAndPushPort());
-                  }
-                } catch (IOException | InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            })
-        .start();
-
-    Thread.sleep(15 * 1000);
-    running.set(false);
+    dataPusher.waitOnTermination();
 
     for (int i = 0; i < numWorker; i++) {
       Assert.assertArrayEquals(workerData.get(i).toArray(), 
tarWorkerData.get(i).toArray());

Reply via email to