This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 39ab731b8 [CELEBORN-875][FOLLOWUP] Enhance `DataPushQueueSuiteJ` for
thread safety and prevent `NullPointerException`
39ab731b8 is described below
commit 39ab731b8584c34f8d40e914836be70b0f70addb
Author: Fu Chen <[email protected]>
AuthorDate: Wed Aug 2 21:52:53 2023 +0800
[CELEBORN-875][FOLLOWUP] Enhance `DataPushQueueSuiteJ` for thread safety
and prevent `NullPointerException`
### What changes were proposed in this pull request?
1. replaced the usage of `HashMap` with `ConcurrentHashMap` for
`partitionBatchIdMap` to ensure thread safety during parallel data processing
2. put the partition id and batch id into the `partitionBatchIdMap` before
adding the task to prevent the possibility of a NPE
### Why are the changes needed?
to fix NPE
https://github.com/apache/incubator-celeborn/actions/runs/5734532048/job/15540863715?pr=1785
```
xception in thread "DataPusher-0" java.lang.NullPointerException
at
org.apache.celeborn.client.write.DataPushQueueSuiteJ$1.pushData(DataPushQueueSuiteJ.java:121)
at
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:125)
Error: The operation was canceled.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1789 from cfmcgrady/celeborn-875-followup.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/write/DataPushQueueSuiteJ.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index b2c0d8b44..284ba3148 100644
---
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
+++
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -20,10 +20,10 @@ package org.apache.celeborn.client.write;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import org.junit.AfterClass;
@@ -45,7 +45,7 @@ public class DataPushQueueSuiteJ {
private static File tempDir = null;
private final int shuffleId = 0;
- private final int numPartitions = 10;
+ private final int numPartitions = 1000000;
@BeforeClass
public static void beforeAll() {
@@ -63,7 +63,7 @@ public class DataPushQueueSuiteJ {
@Test
public void testDataPushQueue() throws Exception {
- final int numWorker = 3;
+ final int numWorker = 30;
List<List<Integer>> workerData = new ArrayList<>();
for (int i = 0; i < numWorker; i++) {
workerData.add(new ArrayList<>());
@@ -76,7 +76,7 @@ public class DataPushQueueSuiteJ {
tarWorkerData.add(new ArrayList<>());
}
- Map<Integer, Integer> partitionBatchIdMap = new HashMap<>();
+ Map<Integer, Integer> partitionBatchIdMap = new ConcurrentHashMap<>();
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER().key(),
"2");
@@ -85,7 +85,6 @@ public class DataPushQueueSuiteJ {
int mapId = 0;
int attemptId = 0;
int numMappers = 10;
- int numPartitions = 10;
final File tempFile = new File(tempDir, UUID.randomUUID().toString());
DummyShuffleClient client = new DummyShuffleClient(conf, tempFile);
client.initReducePartitionMap(shuffleId, numPartitions, numWorker);
@@ -125,10 +124,10 @@ public class DataPushQueueSuiteJ {
for (int i = 0; i < numPartitions; i++) {
byte[] b = intToBytes(workerData.get(i % numWorker).get(i / numWorker));
- dataPusher.addTask(i, b, b.length);
int batchId = pushState.nextBatchId();
pushState.addBatch(batchId, reducePartitionMap.get(i).hostAndPushPort());
partitionBatchIdMap.put(i, batchId);
+ dataPusher.addTask(i, b, b.length);
}
dataPusher.waitOnTermination();