This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1f2953af [ISSUE-339] Optimize retry logic in send shuffle data (#361)
1f2953af is described below
commit 1f2953af7020e68be7c5f329daaf4d13bf00fbe0
Author: xianjingfeng <[email protected]>
AuthorDate: Sun Nov 27 17:04:50 2022 +0800
[ISSUE-339] Optimize retry logic in send shuffle data (#361)
### What changes were proposed in this pull request?
1. Set the default value of `rss.client.retry.max` to 50
2. Set `rss.client.retry.max` to `rss.client.retry.max/2` if replica >
replicaWrite
3. Throw an exception if `rss.client.retry.max *
rss.client.retry.interval.max > rss.client.send.check.timeout.ms`
### Why are the changes needed?
More reasonable. #339
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added
---
.../org/apache/hadoop/mapreduce/RssMRUtils.java | 18 +++++++++++++
.../hadoop/mapreduce/v2/app/RssMRAppMaster.java | 2 +-
.../apache/hadoop/mapreduce/RssMRUtilsTest.java | 21 +++++++++++++++
.../apache/spark/shuffle/RssSparkShuffleUtils.java | 13 ++++++++++
.../spark/shuffle/RssSparkShuffleUtilsTest.java | 30 +++++++++++++++++++++-
.../spark/shuffle/writer/RssShuffleWriterTest.java | 3 ++-
.../spark/shuffle/writer/RssShuffleWriterTest.java | 2 +-
.../client/factory/ShuffleClientFactory.java | 5 ++++
.../uniffle/client/util/RssClientConfig.java | 2 +-
.../uniffle/test/SparkIntegrationTestBase.java | 1 +
.../client/impl/grpc/ShuffleServerGrpcClient.java | 3 ++-
11 files changed, 94 insertions(+), 6 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 347c20b4..83153eb6 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -241,4 +241,22 @@ public class RssMRUtils {
RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer);
}
+
+ public static void validateRssClientConf(JobConf rssJobConf, JobConf
mrJobConf) {
+ int retryMax = getInt(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_RETRY_MAX,
+ RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+ long retryIntervalMax = getLong(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+ RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
+ long sendCheckTimeout = getLong(rssJobConf, mrJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+ RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
+ if (retryIntervalMax * retryMax > sendCheckTimeout) {
+ throw new IllegalArgumentException(String.format("%s(%s) * %s(%s) should
not bigger than %s(%s)",
+ RssMRConfig.RSS_CLIENT_RETRY_MAX,
+ retryMax,
+ RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+ retryIntervalMax,
+ RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
+ sendCheckTimeout));
+ }
+ }
}
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 02c63527..1fb6a96a 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -168,7 +168,7 @@ public class RssMRAppMaster extends MRAppMaster {
// set the remote storage with actual value
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH,
remoteStorage.getPath());
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF,
remoteStorage.getConfString());
-
+ RssMRUtils.validateRssClientConf(extraConf, conf);
// When containers have disk with very limited space, reduce is allowed
to spill data to hdfs
if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
diff --git
a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index 385693f3..d55c8edd 100644
--- a/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -30,8 +30,11 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class RssMRUtilsTest {
+
+ private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should
be thrown";
@Test
public void baskAttemptIdTest() {
@@ -212,4 +215,22 @@ public class RssMRUtilsTest {
jobConf.setDouble("mapreduce.rss.estimate.task.concurrency.dynamic.factor",
0.5);
assertEquals(2, RssMRUtils.getRequiredShuffleServerNumber(jobConf));
}
+
+ @Test
+ public void testValidateRssClientConf() {
+ JobConf jobConf = new JobConf();
+ JobConf rssJobConf = new JobConf();
+ rssJobConf.setInt("mapreduce.job.maps", 500);
+ rssJobConf.setInt("mapreduce.job.reduces", 20);
+ RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+ rssJobConf.setInt(RssMRConfig.RSS_CLIENT_RETRY_MAX, 5);
+ rssJobConf.setLong(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L);
+ rssJobConf.setLong(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L);
+ try {
+ RssMRUtils.validateRssClientConf(rssJobConf, jobConf);
+ fail(EXPECTED_EXCEPTION_MESSAGE);
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("should not bigger than"));
+ }
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index 2320f55b..b7409687 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -112,6 +112,19 @@ public class RssSparkShuffleUtils {
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
+
+ int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
+ long retryIntervalMax =
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
+ long sendCheckTimeout =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS);
+ if (retryIntervalMax * retryMax > sendCheckTimeout) {
+ throw new IllegalArgumentException(String.format("%s(%s) * %s(%s) should
not bigger than %s(%s)",
+ RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(),
+ retryMax,
+ RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(),
+ retryIntervalMax,
+ RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
+ sendCheckTimeout));
+ }
}
public static Configuration getRemoteStorageHadoopConf(
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
index 856fff8c..625db083 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/RssSparkShuffleUtilsTest.java
@@ -33,9 +33,12 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class RssSparkShuffleUtilsTest {
-
+
+ private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should
be thrown";
+
@Test
public void testAssignmentTags() {
SparkConf conf = new SparkConf();
@@ -210,4 +213,29 @@ public class RssSparkShuffleUtilsTest {
sparkConf.set(RssSparkConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER,
100);
assertEquals(3,
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf));
}
+
+ @Test
+ public void testValidateRssClientConf() {
+ SparkConf sparkConf = new SparkConf();
+ try {
+ RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+ fail(EXPECTED_EXCEPTION_MESSAGE);
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("must be set by the client or fetched
from coordinators"));
+ }
+ sparkConf.set(RssSparkConfig.RSS_STORAGE_TYPE, "MEMORY_LOCALFILE_HDFS");
+ RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX, 5);
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L);
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L);
+ try {
+ RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+ fail(EXPECTED_EXCEPTION_MESSAGE);
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("should not bigger than"));
+ }
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 5000L);
+ RssSparkShuffleUtils.validateRssClientConf(sparkConf);
+ }
+
}
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index f71900ce..0189b79f 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -71,6 +71,8 @@ public class RssShuffleWriterTest {
.setMaster("local[2]")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
+ .set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10")
+ .set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), "1000")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name())
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
@@ -137,7 +139,6 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), "32")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "64")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "128")
- .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name())
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 98ffc8a6..1e821997 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -71,6 +71,7 @@ public class RssShuffleWriterTest {
.setMaster("local[2]")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
+ .set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name())
.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
"127.0.0.1:12345,127.0.0.1:12346");
@@ -140,7 +141,6 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "32")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "64")
- .set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "128")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name())
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 2b8b6264..7cbbb37f 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -49,6 +49,11 @@ public class ShuffleClientFactory {
String clientType, int retryMax, long retryIntervalMax, int
heartBeatThreadNum,
int replica, int replicaWrite, int replicaRead, boolean
replicaSkipEnabled, int dataTransferPoolSize,
int dataCommitPoolSize, int unregisterThreadPoolSize, int
unregisterRequestTimeoutSec) {
+ // If replica > replicaWrite, blocks maybe will be sended for 2 rounds.
+ // We need retry less times in this case for let the first round fail fast.
+ if (replicaSkipEnabled && replica > replicaWrite) {
+ retryMax = retryMax / 2;
+ }
return new ShuffleWriteClientImpl(
clientType,
retryMax,
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index caf025ce..fd69221b 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -22,7 +22,7 @@ public class RssClientConfig {
public static final String RSS_CLIENT_TYPE = "rss.client.type";
public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = "GRPC";
public static final String RSS_CLIENT_RETRY_MAX = "rss.client.retry.max";
- public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = 100;
+ public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = 50;
public static final String RSS_CLIENT_RETRY_INTERVAL_MAX =
"rss.client.retry.interval.max";
public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE =
10000L;
public static final String RSS_COORDINATOR_QUORUM = "rss.coordinator.quorum";
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 5632ab5e..3ef2dd6e 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -101,6 +101,7 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k");
sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
"30000");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10");
sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(),
"1000");
sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), "1000");
sparkConf.set(RssSparkConfig.RSS_INDEX_READ_LIMIT.key(), "100");
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 82a590db..543ce1f6 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -302,7 +302,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
final int finalBlockNum = blockNum;
try {
RetryUtils.retry(() -> {
- long requireId = requirePreAllocation(allocateSize,
request.getRetryMax(), request.getRetryIntervalMax());
+ long requireId = requirePreAllocation(allocateSize,
request.getRetryMax() / maxRetryAttempts,
+ request.getRetryIntervalMax());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]",
allocateSize, host, port));