This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f869ab25b [CELEBORN-857][TEST] Refine DataPushQueueSuiteJ
f869ab25b is described below
commit f869ab25b65d9fe10be1a43e70e6c59d7412a492
Author: Fu Chen <[email protected]>
AuthorDate: Mon Jul 31 15:43:43 2023 +0800
[CELEBORN-857][TEST] Refine DataPushQueueSuiteJ
### What changes were proposed in this pull request?
1. This PR propose renaming the class `DataPushQueueSuitJ` to
`DataPushQueueSuiteJ` in order to enable its integration with the test suite.
This change is required to comply with our maven-surefire-plugin plugin rule
https://github.com/apache/incubator-celeborn/blob/5f0295e9f3f1f5781af124ae319e202a2594a103/pom.xml#L543-L551
2. To fix a potential logic bug in the test, tasks within `DataPushQueue`
may inadvertently be consumed by the `DataPusher`s built-in thread
`DataPusher-${taskId}`, leading to test suite failures.


### Why are the changes needed?
fix DataPushQueueSuiteJ bug
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1774 from cfmcgrady/refine-data-push-queue-suite.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/write/DataPusher.java | 2 +-
...ushQueueSuitJ.java => DataPushQueueSuiteJ.java} | 53 ++++++++--------------
2 files changed, 20 insertions(+), 35 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index 5444e31f4..a4932674e 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -193,7 +193,7 @@ public class DataPusher {
}
}
- private void pushData(PushTask task) throws IOException {
+ protected void pushData(PushTask task) throws IOException {
int bytesWritten =
client.pushData(
shuffleId,
diff --git
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuitJ.java
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
similarity index 80%
rename from
client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuitJ.java
rename to
client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index bbd432bc8..b2c0d8b44 100644
---
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuitJ.java
+++
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.junit.AfterClass;
@@ -41,8 +40,8 @@ import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;
-public class DataPushQueueSuitJ {
- private static final Logger LOG =
LoggerFactory.getLogger(DataPushQueueSuitJ.class);
+public class DataPushQueueSuiteJ {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataPushQueueSuiteJ.class);
private static File tempDir = null;
private final int shuffleId = 0;
@@ -95,6 +94,11 @@ public class DataPushQueueSuitJ {
for (int i = 0; i < numPartitions; i++) {
mapStatusLengths[i] = new LongAdder();
}
+ final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
+ PushState pushState = client.getPushState(mapKey);
+ Map<Integer, PartitionLocation> reducePartitionMap =
+ client.getReducePartitionMap().get(shuffleId);
+
DataPusher dataPusher =
new DataPusher(
shuffleId,
@@ -107,13 +111,17 @@ public class DataPushQueueSuitJ {
client,
null,
integer -> {},
- mapStatusLengths);
-
- final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
- DataPushQueue dataPushQueue = dataPusher.getDataPushQueue();
- PushState pushState = client.getPushState(mapKey);
- Map<Integer, PartitionLocation> reducePartitionMap =
- client.getReducePartitionMap().get(shuffleId);
+ mapStatusLengths) {
+ @Override
+ protected void pushData(PushTask task) throws IOException {
+ byte[] buffer = task.getBuffer();
+ int partitionId = task.getPartitionId();
+ tarWorkerData.get(partitionId % numWorker).add(bytesToInt(buffer));
+ pushState.removeBatch(
+ partitionBatchIdMap.get(partitionId),
+ reducePartitionMap.get(partitionId).hostAndPushPort());
+ }
+ };
for (int i = 0; i < numPartitions; i++) {
byte[] b = intToBytes(workerData.get(i % numWorker).get(i / numWorker));
@@ -123,30 +131,7 @@ public class DataPushQueueSuitJ {
partitionBatchIdMap.put(i, batchId);
}
- AtomicBoolean running = new AtomicBoolean(true);
- new Thread(
- () -> {
- while (running.get()) {
- try {
- ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
- for (int i = 0; i < tasks.size(); i++) {
- PushTask task = tasks.get(i);
- byte[] buffer = task.getBuffer();
- int partitionId = task.getPartitionId();
- tarWorkerData.get(partitionId %
numWorker).add(bytesToInt(buffer));
- pushState.removeBatch(
- partitionBatchIdMap.get(partitionId),
- reducePartitionMap.get(partitionId).hostAndPushPort());
- }
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- })
- .start();
-
- Thread.sleep(15 * 1000);
- running.set(false);
+ dataPusher.waitOnTermination();
for (int i = 0; i < numWorker; i++) {
Assert.assertArrayEquals(workerData.get(i).toArray(),
tarWorkerData.get(i).toArray());