This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 38dcff19a [#1608] feat(spark3): Ensure the compatiblity of reassign
and stageRetry (#1783)
38dcff19a is described below
commit 38dcff19a8e9cec615bc8074ff1ebabecd57cdf9
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jun 19 13:29:46 2024 +0800
[#1608] feat(spark3): Ensure the compatiblity of reassign and stageRetry
(#1783)
### What changes were proposed in this pull request?
Ensure the compatiblity of reassign and stageRetry.
### Why are the changes needed?
To improve the job stability if having reassign and stage retry.
For #1608
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
---
.../shuffle/manager/RssShuffleManagerBase.java | 34 ++++++++---
.../apache/spark/shuffle/RssShuffleManager.java | 11 ++--
.../apache/spark/shuffle/RssShuffleManager.java | 17 +++---
.../uniffle/test/ReassignAndStageRetryTest.java | 71 ++++++++++++++++++++++
.../server/MockedShuffleServerGrpcService.java | 15 +++++
5 files changed, 123 insertions(+), 25 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 197035e3a..25acf1f0f 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -117,7 +117,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
protected boolean blockIdSelfManagedEnabled;
- protected boolean taskBlockSendFailureRetryEnabled;
+ protected boolean partitionReassignEnabled;
protected boolean shuffleManagerRpcServiceEnabled;
@@ -638,7 +638,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
@Override
public int getMaxFetchFailures() {
final String TASK_MAX_FAILURE = "spark.task.maxFailures";
- return Math.max(1, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
+ return Math.max(0, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
}
/**
@@ -701,6 +701,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
(StageAttemptShuffleHandleInfo)
shuffleHandleInfoManager.get(shuffleId);
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId,
stageIdAndAttempt, true);
+ LOG.info(
+ "The stage retry has been triggered successfully for the stageId:
{}, attemptNumber: {}",
+ stageId,
+ stageAttemptNumber);
return true;
} else {
LOG.info(
@@ -720,12 +724,22 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
long startTime = System.currentTimeMillis();
- MutableShuffleHandleInfo handleInfo =
- (MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
- synchronized (handleInfo) {
+ ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
+ MutableShuffleHandleInfo internalHandle = null;
+ if (handleInfo instanceof MutableShuffleHandleInfo) {
+ internalHandle = (MutableShuffleHandleInfo) handleInfo;
+ } else if (handleInfo instanceof StageAttemptShuffleHandleInfo) {
+ internalHandle =
+ (MutableShuffleHandleInfo) ((StageAttemptShuffleHandleInfo)
handleInfo).getCurrent();
+ }
+ if (internalHandle == null) {
+ throw new RssException(
+ "An unexpected error occurred: internalHandle is null, which should
not happen");
+ }
+ synchronized (internalHandle) {
// If the reassignment servers for one partition exceeds the max
reassign server num,
// it should fast fail.
- handleInfo.checkPartitionReassignServerNum(
+ internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new
HashMap<>();
@@ -740,10 +754,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
String serverId = receivingFailureServer.getServerId();
boolean serverHasReplaced = false;
- Set<ShuffleServerInfo> replacements =
handleInfo.getReplacements(serverId);
+ Set<ShuffleServerInfo> replacements =
internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
final int requiredServerNum = 1;
- Set<String> excludedServers = new
HashSet<>(handleInfo.listExcludedServers());
+ Set<String> excludedServers = new
HashSet<>(internalHandle.listExcludedServers());
excludedServers.add(serverId);
replacements =
reassignServerForTask(
@@ -759,7 +773,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
Set<ShuffleServerInfo> updatedReassignServers =
- handleInfo.updateAssignment(partitionId, serverId, replacements);
+ internalHandle.updateAssignment(partitionId, serverId,
replacements);
reassignResult
.computeIfAbsent(serverId, x -> new HashMap<>())
@@ -788,7 +802,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
System.currentTimeMillis() - startTime,
reassignResult);
- return handleInfo;
+ return internalHandle;
}
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index eb48b2f38..974b99862 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -168,11 +168,10 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
- this.taskBlockSendFailureRetryEnabled =
- rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
+ this.partitionReassignEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled =
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
- taskBlockSendFailureRetryEnabled || rssResubmitStage ||
blockIdSelfManagedEnabled;
+ partitionReassignEnabled || rssResubmitStage ||
blockIdSelfManagedEnabled;
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
if (isDriver) {
heartBeatScheduledExecutorService =
@@ -341,7 +340,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage,
handleInfo);
shuffleHandleInfoManager.register(shuffleId,
stageAttemptShuffleHandleInfo);
- } else if (shuffleManagerRpcServiceEnabled &&
taskBlockSendFailureRetryEnabled) {
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers,
remoteStorage);
shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
@@ -410,7 +409,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver
based on the shuffleId
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
- } else if (shuffleManagerRpcServiceEnabled &&
taskBlockSendFailureRetryEnabled) {
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver
based on the shuffleId
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
@@ -483,7 +482,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver
based on the shuffleId.
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
- } else if (shuffleManagerRpcServiceEnabled &&
taskBlockSendFailureRetryEnabled) {
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver
based on the shuffleId
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6354e17a2..9e6f2a26f 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -182,20 +182,19 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
- this.taskBlockSendFailureRetryEnabled =
- rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
+ this.partitionReassignEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
// The feature of partition reassign is exclusive with multiple replicas
and stage retry.
- if (taskBlockSendFailureRetryEnabled) {
- if (rssResubmitStage || dataReplica > 1) {
+ if (partitionReassignEnabled) {
+ if (dataReplica > 1) {
throw new RssException(
- "The feature of partition reassign is incompatible with multiple
replicas and stage retry.");
+ "The feature of task partition reassign is incompatible with
multiple replicas mechanism.");
}
}
this.blockIdSelfManagedEnabled =
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
- taskBlockSendFailureRetryEnabled || rssResubmitStage ||
blockIdSelfManagedEnabled;
+ partitionReassignEnabled || rssResubmitStage ||
blockIdSelfManagedEnabled;
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
@@ -455,7 +454,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
StageAttemptShuffleHandleInfo handleInfo =
new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage,
shuffleHandleInfo);
shuffleHandleInfoManager.register(shuffleId, handleInfo);
- } else if (shuffleManagerRpcServiceEnabled &&
taskBlockSendFailureRetryEnabled) {
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers,
remoteStorage);
shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
@@ -496,7 +495,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
- } else if (shuffleManagerRpcServiceEnabled &&
taskBlockSendFailureRetryEnabled) {
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
@@ -640,7 +639,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
- } else if (shuffleManagerRpcServiceEnabled &&
taskBlockSendFailureRetryEnabled) {
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java
new file mode 100644
index 000000000..663a176c8
--- /dev/null
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.apache.spark.SparkConf;
+
+import org.apache.uniffle.server.MockedGrpcServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+
+import static
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
+import static
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
+import static
org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
+import static
org.apache.uniffle.client.util.RssClientConfig.RSS_RESUBMIT_STAGE;
+import static
org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
+
+/**
+ * This class is to test the compatibility of reassign and stage retry
mechanism that were enabled
+ * at the same time.
+ */
+public class ReassignAndStageRetryTest extends
PartitionBlockDataReassignMultiTimesTest {
+
+ @Override
+ public void updateSparkConfCustomer(SparkConf sparkConf) {
+ sparkConf.set("spark.task.maxFailures", String.valueOf(1));
+ sparkConf.set("spark." + RSS_RESUBMIT_STAGE, "true");
+
+ sparkConf.set("spark.sql.shuffle.partitions", "4");
+ sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
+ sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
+ sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
+ sparkConf.set("spark." +
RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "1");
+
+ // simulate the grpc servers has different free memory
+ // and make the assign priority seq: g1 -> g2 -> g3
+ ShuffleServer g1 = grpcShuffleServers.get(0);
+ ShuffleBufferManager bufferManager = g1.getShuffleBufferManager();
+ bufferManager.setUsedMemory(bufferManager.getCapacity() - 3000000);
+ g1.sendHeartbeat();
+
+ ShuffleServer g2 = grpcShuffleServers.get(1);
+ bufferManager = g2.getShuffleBufferManager();
+ bufferManager.setUsedMemory(bufferManager.getCapacity() - 2000000);
+ g2.sendHeartbeat();
+
+ ShuffleServer g3 = grpcShuffleServers.get(2);
+ bufferManager = g3.getShuffleBufferManager();
+ bufferManager.setUsedMemory(bufferManager.getCapacity() - 1000000);
+ g3.sendHeartbeat();
+
+ // This will make the partition of g1 reassign to g2 servers.
+ ((MockedGrpcServer)
g1.getServer()).getService().setMockSendDataFailedStageNumber(0);
+ // And then reassign to g3. But reassign max times reaches due to max
reassign times.
+ ((MockedGrpcServer)
g2.getServer()).getService().setMockSendDataFailedStageNumber(0);
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index d0d77f60d..a25e1fd45 100644
---
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -44,6 +44,7 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
private long mockedTimeout = -1L;
private boolean mockSendDataFailed = false;
+ private int mockSendDataFailedStageNumber = -1;
private boolean mockRequireBufferFailedWithNoBuffer = false;
private boolean isMockRequireBufferFailedWithNoBufferForHugePartition =
false;
@@ -130,6 +131,12 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
LOG.info("Add a mocked sendData failed on sendShuffleData");
throw new RuntimeException("This write request is failed as mocked
failureļ¼");
}
+ if (mockSendDataFailedStageNumber == request.getStageAttemptNumber()) {
+ LOG.info(
+ "Add a mocked sendData failed on sendShuffleData with the stage
number={}",
+ mockSendDataFailedStageNumber);
+ throw new RuntimeException("This write request is failed as mocked
failureļ¼");
+ }
if (mockedTimeout > 0) {
LOG.info("Add a mocked timeout on sendShuffleData");
Uninterruptibles.sleepUninterruptibly(mockedTimeout,
TimeUnit.MILLISECONDS);
@@ -286,4 +293,12 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
}
super.getLocalShuffleIndex(request, responseObserver);
}
+
+ public int getMockSendDataFailedStageNumber() {
+ return mockSendDataFailedStageNumber;
+ }
+
+ public void setMockSendDataFailedStageNumber(int
mockSendDataFailedStageNumber) {
+ this.mockSendDataFailedStageNumber = mockSendDataFailedStageNumber;
+ }
}