This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 39ab731b8 [CELEBORN-875][FOLLOWUP] Enhance `DataPushQueueSuiteJ` for 
thread safety and prevent `NullPointerException`
39ab731b8 is described below

commit 39ab731b8584c34f8d40e914836be70b0f70addb
Author: Fu Chen <[email protected]>
AuthorDate: Wed Aug 2 21:52:53 2023 +0800

    [CELEBORN-875][FOLLOWUP] Enhance `DataPushQueueSuiteJ` for thread safety 
and prevent `NullPointerException`
    
    ### What changes were proposed in this pull request?
    
    1. replaced the usage of `HashMap` with `ConcurrentHashMap` for 
`partitionBatchIdMap` to ensure thread safety during parallel data processing
    2. put the partition id and batch id into the `partitionBatchIdMap` before 
adding the task to prevent the possibility of a NPE
    
    ### Why are the changes needed?
    
    to fix NPE
    
    
https://github.com/apache/incubator-celeborn/actions/runs/5734532048/job/15540863715?pr=1785
    
    ```
    xception in thread "DataPusher-0" java.lang.NullPointerException
            at 
org.apache.celeborn.client.write.DataPushQueueSuiteJ$1.pushData(DataPushQueueSuiteJ.java:121)
            at 
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:125)
    Error: The operation was canceled.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1789 from cfmcgrady/celeborn-875-followup.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/client/write/DataPushQueueSuiteJ.java | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
 
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index b2c0d8b44..284ba3148 100644
--- 
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
+++ 
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -20,10 +20,10 @@ package org.apache.celeborn.client.write;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.junit.AfterClass;
@@ -45,7 +45,7 @@ public class DataPushQueueSuiteJ {
   private static File tempDir = null;
 
   private final int shuffleId = 0;
-  private final int numPartitions = 10;
+  private final int numPartitions = 1000000;
 
   @BeforeClass
   public static void beforeAll() {
@@ -63,7 +63,7 @@ public class DataPushQueueSuiteJ {
 
   @Test
   public void testDataPushQueue() throws Exception {
-    final int numWorker = 3;
+    final int numWorker = 30;
     List<List<Integer>> workerData = new ArrayList<>();
     for (int i = 0; i < numWorker; i++) {
       workerData.add(new ArrayList<>());
@@ -76,7 +76,7 @@ public class DataPushQueueSuiteJ {
       tarWorkerData.add(new ArrayList<>());
     }
 
-    Map<Integer, Integer> partitionBatchIdMap = new HashMap<>();
+    Map<Integer, Integer> partitionBatchIdMap = new ConcurrentHashMap<>();
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER().key(), 
"2");
@@ -85,7 +85,6 @@ public class DataPushQueueSuiteJ {
     int mapId = 0;
     int attemptId = 0;
     int numMappers = 10;
-    int numPartitions = 10;
     final File tempFile = new File(tempDir, UUID.randomUUID().toString());
     DummyShuffleClient client = new DummyShuffleClient(conf, tempFile);
     client.initReducePartitionMap(shuffleId, numPartitions, numWorker);
@@ -125,10 +124,10 @@ public class DataPushQueueSuiteJ {
 
     for (int i = 0; i < numPartitions; i++) {
       byte[] b = intToBytes(workerData.get(i % numWorker).get(i / numWorker));
-      dataPusher.addTask(i, b, b.length);
       int batchId = pushState.nextBatchId();
       pushState.addBatch(batchId, reducePartitionMap.get(i).hostAndPushPort());
       partitionBatchIdMap.put(i, batchId);
+      dataPusher.addTask(i, b, b.length);
     }
 
     dataPusher.waitOnTermination();

Reply via email to