This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ec7f85c3 [#825][FOLLOWUP] fix(spark): Apply a thread safety way to
track the blocks sending result (#1260)
ec7f85c3 is described below
commit ec7f85c3d0d56286b46bcfcb50f1de75ba1d736d
Author: summaryzb <[email protected]>
AuthorDate: Wed Oct 25 11:03:26 2023 +0800
[#825][FOLLOWUP] fix(spark): Apply a thread safety way to track the blocks
sending result (#1260)
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
```
[INFO] Running org.apache.uniffle.test.ContinuousSelectPartitionStrategyTest
Error: Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed:
59.195 s <<< FAILURE! - in
org.apache.uniffle.test.ContinuousSelectPartitionStrategyTest
Error: resultCompareTest Time elapsed: 55.751 s <<< ERROR!
org.apache.spark.SparkException:
Job aborted due to stage failure: Task 6 in stage 1.0 failed 1 times, most
recent failure: Lost task 6.0 in stage 1.0 (TID 16)
(fv-az391-410.nf14wd45lyte3l5gjbhk121dmd.jx.internal.cloudapp.net executor
driver): org.apache.uniffle.common.exception.RssException: Timeout: Task[16_0]
failed because 9 blocks can't be sent to shuffle server in 30000 ms.
at
org.apache.spark.shuffle.writer.RssShuffleWriter.checkBlockSendResult(RssShuffleWriter.java:350)
at
org.apache.spark.shuffle.writer.RssShuffleWriter.writeImpl(RssShuffleWriter.java:246)
at
org.apache.spark.shuffle.writer.RssShuffleWriter.write(RssShuffleWriter.java:209)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
```
[ActionLink](https://github.com/apache/incubator-uniffle/actions/runs/6611324517/job/17954967498?pr=1257)
I debug the local test and find that all blocks are successfully send, but
some blocks are not in the block tracker
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Integration test
Especially run below test in a loop of many times without a fail
```
mvn -B -fae test
-Dtest=org.apache.uniffle.test.ContinuousSelectPartitionStrategyTest -Pspark3
```
---
.../apache/spark/shuffle/writer/DataPusher.java | 10 ++--
.../spark/shuffle/writer/DataPusherTest.java | 3 +-
.../apache/spark/shuffle/RssShuffleManager.java | 12 +++--
.../spark/shuffle/writer/RssShuffleWriterTest.java | 3 +-
.../apache/spark/shuffle/RssShuffleManager.java | 15 +++---
.../java/org/apache/spark/shuffle/TestUtils.java | 4 +-
.../spark/shuffle/writer/RssShuffleWriterTest.java | 7 ++-
.../client/impl/ShuffleWriteClientImpl.java | 56 ++++++++++++++--------
.../client/response/SendShuffleDataResult.java | 8 ++--
9 files changed, 71 insertions(+), 47 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index b578ac01..68ec8fb3 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -56,7 +57,8 @@ public class DataPusher implements Closeable {
private final Map<String, Set<Long>> taskToSuccessBlockIds;
// Must be thread safe
private final Map<String, Set<Long>> taskToFailedBlockIds;
- private final Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer;
+ private final Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
+ taskToFailedBlockIdsAndServer;
private String rssAppId;
// Must be thread safe
private final Set<String> failedTaskIds;
@@ -65,7 +67,7 @@ public class DataPusher implements Closeable {
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime) {
@@ -126,9 +128,9 @@ public class DataPusher implements Closeable {
}
private synchronized void putSendFailedBlockIdAndShuffleServer(
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
String taskAttemptId,
- Map<Long, List<ShuffleServerInfo>> blockIdsAndServer) {
+ Map<Long, BlockingQueue<ShuffleServerInfo>> blockIdsAndServer) {
if (blockIdsAndServer == null || blockIdsAndServer.isEmpty()) {
return;
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
index 21a8499d..a3cdbb6e 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
@@ -81,7 +82,7 @@ public class DataPusherTest {
Map<String, Set<Long>> taskToSuccessBlockIds = Maps.newConcurrentMap();
Map<String, Set<Long>> taskToFailedBlockIds = Maps.newConcurrentMap();
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
JavaUtils.newConcurrentMap();
Set<String> failedTaskIds = new HashSet<>();
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d1f1d9a6..4efde2ec 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -87,8 +88,8 @@ public class RssShuffleManager extends RssShuffleManagerBase {
private Map<String, Set<Long>> taskToSuccessBlockIds =
JavaUtils.newConcurrentMap();
private Map<String, Set<Long>> taskToFailedBlockIds =
JavaUtils.newConcurrentMap();
// Record both the block that failed to be sent and the ShuffleServer
- private final Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
- JavaUtils.newConcurrentMap();
+ private final Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
+ taskToFailedBlockIdsAndServer = JavaUtils.newConcurrentMap();
private final int dataReplica;
private final int dataReplicaWrite;
private final int dataReplicaRead;
@@ -703,10 +704,11 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
* @param taskId Shuffle taskId
* @return List of failed ShuffleServer blocks
*/
- public Map<Long, List<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(String taskId) {
- Map<Long, List<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
+ public Map<Long, BlockingQueue<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(
+ String taskId) {
+ Map<Long, BlockingQueue<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
if (result == null) {
- result = JavaUtils.newConcurrentMap();
+ result = Collections.emptyMap();
}
return result;
}
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 86115c69..e60930d8 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -162,7 +163,7 @@ public class RssShuffleWriterTest {
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index a917b456..9ec5e90a 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -98,7 +99,8 @@ public class RssShuffleManager extends RssShuffleManagerBase {
private final Map<String, Set<Long>> taskToSuccessBlockIds;
private final Map<String, Set<Long>> taskToFailedBlockIds;
// Record both the block that failed to be sent and the ShuffleServer
- private final Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer;
+ private final Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
+ taskToFailedBlockIdsAndServer;
private ScheduledExecutorService heartBeatScheduledExecutorService;
private boolean heartbeatStarted = false;
private boolean dynamicConfEnabled = false;
@@ -270,7 +272,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
DataPusher dataPusher,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer) {
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer) {
this.sparkConf = conf;
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
this.dataDistributionType =
@@ -999,12 +1001,13 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
* The ShuffleServer list of block sending failures is returned using the
shuffle task ID
*
* @param taskId Shuffle taskId
- * @return List of failed ShuffleServer blocks
+ * @return failed ShuffleServer blocks
*/
- public Map<Long, List<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(String taskId) {
- Map<Long, List<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
+ public Map<Long, BlockingQueue<ShuffleServerInfo>>
getFailedBlockIdsWithShuffleServer(
+ String taskId) {
+ Map<Long, BlockingQueue<ShuffleServerInfo>> result =
taskToFailedBlockIdsAndServer.get(taskId);
if (result == null) {
- result = JavaUtils.newConcurrentMap();
+ result = Collections.emptyMap();
}
return result;
}
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
index d0424f4f..23124247 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/TestUtils.java
@@ -17,9 +17,9 @@
package org.apache.spark.shuffle;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.SparkConf;
@@ -37,7 +37,7 @@ public class TestUtils {
DataPusher dataPusher,
Map<String, Set<Long>> successBlockIds,
Map<String, Set<Long>> failBlockIds,
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer) {
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer) {
return new RssShuffleManager(
conf, isDriver, dataPusher, successBlockIds, failBlockIds,
taskToFailedBlockIdsAndServer);
}
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 09eeff7c..ca993628 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -82,12 +83,10 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
Map<String, Set<Long>> failBlocks = JavaUtils.newConcurrentMap();
Map<String, Set<Long>> successBlocks = JavaUtils.newConcurrentMap();
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer =
- JavaUtils.newConcurrentMap();
Serializer kryoSerializer = new KryoSerializer(conf);
RssShuffleManager manager =
TestUtils.createShuffleManager(
- conf, false, null, successBlocks, failBlocks,
taskToFailedBlockIdsAndServer);
+ conf, false, null, successBlocks, failBlocks,
JavaUtils.newConcurrentMap());
ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
@@ -164,7 +163,7 @@ public class RssShuffleWriterTest {
ShuffleWriteClient shuffleWriteClient,
Map<String, Set<Long>> taskToSuccessBlockIds,
Map<String, Set<Long>> taskToFailedBlockIds,
- Map<String, Map<Long, List<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
+ Map<String, Map<Long, BlockingQueue<ShuffleServerInfo>>>
taskToFailedBlockIdsAndServer,
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index f777f6ff..ff7aad71 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -23,12 +23,14 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -151,8 +153,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
String appId,
Map<ShuffleServerInfo, Map<Integer, Map<Integer,
List<ShuffleBlockInfo>>>> serverToBlocks,
Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
- Map<Long, List<ShuffleServerInfo>> blockIdsSendSuccessTracker,
- Map<Long, List<ShuffleServerInfo>> blockIdsSendFailTracker,
+ Map<Long, AtomicInteger> blockIdsSendSuccessTracker,
+ Map<Long, BlockingQueue<ShuffleServerInfo>> blockIdsSendFailTracker,
boolean allowFastFail,
Supplier<Boolean> needCancelRequest) {
@@ -195,10 +197,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
serverToBlockIds
.get(ssi)
.forEach(
- blockId ->
- blockIdsSendSuccessTracker
- .computeIfAbsent(blockId, id ->
Lists.newArrayList())
- .add(ssi));
+ blockId ->
blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
@@ -211,7 +210,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
.forEach(
blockId ->
blockIdsSendFailTracker
- .computeIfAbsent(blockId, id ->
Lists.newArrayList())
+ .computeIfAbsent(blockId, id -> new
LinkedBlockingQueue<>())
.add(ssi));
if (defectiveServers != null) {
defectiveServers.add(ssi);
@@ -225,7 +224,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
.forEach(
blockId ->
blockIdsSendFailTracker
- .computeIfAbsent(blockId, id ->
Lists.newArrayList())
+ .computeIfAbsent(blockId, id -> new
LinkedBlockingQueue<>())
.add(ssi));
if (defectiveServers != null) {
defectiveServers.add(ssi);
@@ -355,8 +354,28 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
}
/** Records the ShuffleServer that successfully or failed to send blocks */
- Map<Long, List<ShuffleServerInfo>> blockIdSendSuccessTracker =
JavaUtils.newConcurrentMap();
- Map<Long, List<ShuffleServerInfo>> blockIdsSendFailTracker =
JavaUtils.newConcurrentMap();
+ // we assume that most of the blocks can be sent successfully
+ // so initialize the map at first without concurrency insurance
+ // AtomicInteger is enough to reflect value changes in other threads
+ Map<Long, AtomicInteger> blockIdsSendSuccessTracker = Maps.newHashMap();
+ primaryServerToBlockIds
+ .values()
+ .forEach(
+ blockList ->
+ blockList.forEach(
+ block ->
+ blockIdsSendSuccessTracker.computeIfAbsent(
+ block, id -> new AtomicInteger(0))));
+ secondaryServerToBlockIds
+ .values()
+ .forEach(
+ blockList ->
+ blockList.forEach(
+ block ->
+ blockIdsSendSuccessTracker.computeIfAbsent(
+ block, id -> new AtomicInteger(0))));
+ Map<Long, BlockingQueue<ShuffleServerInfo>> blockIdsSendFailTracker =
+ JavaUtils.newConcurrentMap();
// sent the primary round of blocks.
boolean isAllSuccess =
@@ -364,7 +383,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
appId,
primaryServerToBlocks,
primaryServerToBlockIds,
- blockIdSendSuccessTracker,
+ blockIdsSendSuccessTracker,
blockIdsSendFailTracker,
secondaryServerToBlocks.isEmpty(),
needCancelRequest);
@@ -380,20 +399,19 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
appId,
secondaryServerToBlocks,
secondaryServerToBlockIds,
- blockIdSendSuccessTracker,
+ blockIdsSendSuccessTracker,
blockIdsSendFailTracker,
true,
needCancelRequest);
}
- blockIdSendSuccessTracker
+ Set<Long> blockIdsSendSuccessSet = Sets.newHashSet();
+ blockIdsSendSuccessTracker
.entrySet()
.forEach(
successBlockId -> {
- if (successBlockId.getValue().size() < replicaWrite) {
- // Removes blocks that do not reach replicaWrite from the
success queue
- blockIdSendSuccessTracker.remove(successBlockId.getKey());
- } else {
+ if (successBlockId.getValue().get() >= replicaWrite) {
+ blockIdsSendSuccessSet.add(successBlockId.getKey());
// If the replicaWrite to be sent is reached,
// no matter whether the block fails to be sent or not,
// the block is considered to have been sent successfully and
is removed from the
@@ -402,9 +420,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
});
return new SendShuffleDataResult(
- blockIdSendSuccessTracker.keySet(),
- blockIdsSendFailTracker.keySet(),
- blockIdsSendFailTracker);
+ blockIdsSendSuccessSet, blockIdsSendFailTracker.keySet(),
blockIdsSendFailTracker);
}
/**
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
index d33a2986..f2d820e6 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
@@ -17,9 +17,9 @@
package org.apache.uniffle.client.response;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.JavaUtils;
@@ -28,7 +28,7 @@ public class SendShuffleDataResult {
private Set<Long> successBlockIds;
private Set<Long> failedBlockIds;
- private Map<Long, List<ShuffleServerInfo>> sendFailedBlockIds;
+ private Map<Long, BlockingQueue<ShuffleServerInfo>> sendFailedBlockIds;
public SendShuffleDataResult(Set<Long> successBlockIds, Set<Long>
failedBlockIds) {
this.successBlockIds = successBlockIds;
@@ -39,7 +39,7 @@ public class SendShuffleDataResult {
public SendShuffleDataResult(
Set<Long> successBlockIds,
Set<Long> failedBlockIds,
- Map<Long, List<ShuffleServerInfo>> sendFailedBlockIds) {
+ Map<Long, BlockingQueue<ShuffleServerInfo>> sendFailedBlockIds) {
this.successBlockIds = successBlockIds;
this.failedBlockIds = failedBlockIds;
this.sendFailedBlockIds = sendFailedBlockIds;
@@ -53,7 +53,7 @@ public class SendShuffleDataResult {
return failedBlockIds;
}
- public Map<Long, List<ShuffleServerInfo>> getSendFailedBlockIds() {
+ public Map<Long, BlockingQueue<ShuffleServerInfo>> getSendFailedBlockIds() {
return sendFailedBlockIds;
}
}