This is an automated email from the ASF dual-hosted git repository.
zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7e84a1a6d [#1606] feat(tez): Add retry mechanism for NO_BUFFER when
reading data(memory/local/index) (#2110)
7e84a1a6d is described below
commit 7e84a1a6d826a3c705b7896fde18e340022b4bff
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Sep 23 18:17:00 2024 +0800
[#1606] feat(tez): Add retry mechanism for NO_BUFFER when reading
data(memory/local/index) (#2110)
### What changes were proposed in this pull request?
Add retry mechanism tez. #1606 add retry mechanism, but forgot to add for
tez.
### Why are the changes needed?
Fix: #1606
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../common/shuffle/impl/RssTezFetcherTask.java | 16 ++++++++--
.../orderedgrouped/RssShuffleScheduler.java | 36 +++++++++++++---------
2 files changed, 35 insertions(+), 17 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index 3052c5dbc..b0dd8df27 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -43,8 +43,10 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.UnitConverter;
@@ -186,7 +188,15 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
Configuration hadoopConf = getRemoteConf();
LOG.info("RssTezFetcherTask storageType:{}", storageType);
boolean expectedTaskIdsBitmapFilterEnable = serverInfoSet.size() > 1;
-
+ RssConf rssConf = RssTezConfig.toRssConf(this.conf);
+ int retryMax =
+ rssConf.getInteger(
+ RssClientConfig.RSS_CLIENT_RETRY_MAX,
+ RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+ long retryIntervalMax =
+ rssConf.getLong(
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
@@ -203,7 +213,9 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
.hadoopConf(hadoopConf)
.idHelper(new TezIdHelper())
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
- .rssConf(RssTezConfig.toRssConf(this.conf)));
+ .retryMax(retryMax)
+ .retryIntervalMax(retryIntervalMax)
+ .rssConf(rssConf));
RssTezFetcher fetcher =
new RssTezFetcher(
fetcherCallback,
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 3856d0f1f..5f5c2c7af 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -96,8 +96,10 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.UnitConverter;
@@ -283,7 +285,6 @@ class RssShuffleScheduler extends ShuffleScheduler {
private final int partitionNumPerRange;
private String basePath;
private RemoteStorageInfo remoteStorageInfo;
- private int indexReadLimit;
private final int maxAttemptNo;
@@ -505,10 +506,8 @@ class RssShuffleScheduler extends ShuffleScheduler {
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
- /**
- * Setting to very high val can lead to Http 400 error. Cap it to 75;
every attempt id would be
- * approximately 48 bytes; 48 * 75 = 3600 which should give some room for
other info in URL.
- */
+ // Setting to very high val can lead to Http 400 error. Cap it to 75;
every attempt id would be
+ // approximately 48 bytes; 48 * 75 = 3600 which should give some room for
other info in URL.
this.maxTaskOutputAtOnce =
Math.max(
1,
@@ -685,10 +684,8 @@ class RssShuffleScheduler extends ShuffleScheduler {
lastEventReceived.setValue(relativeTime);
}
- /**
- * Placeholder for tracking shuffle events in case we get multiple spills
info for the same
- * attempt.
- */
+ // Placeholder for tracking shuffle events in case we get multiple spills
info for the same
+ // attempt.
static class ShuffleEventInfo {
BitSet eventsProcessed;
int finalEventId = -1; // 0 indexed
@@ -786,11 +783,9 @@ class RssShuffleScheduler extends ShuffleScheduler {
skippedInputCounter.increment(1);
}
- /**
- * In case of pipelined shuffle, it is quite possible that fetchers
pulled the FINAL_UPDATE
- * spill in advance due to smaller output size. In such scenarios, we
need to wait until we
- * retrieve all spill details to claim success.
- */
+ // In case of pipelined shuffle, it is quite possible that fetchers
pulled the FINAL_UPDATE
+ // spill in advance due to smaller output size. In such scenarios, we
need to wait until we
+ // retrieve all spill details to claim success.
if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
remainingMaps.decrementAndGet();
setInputFinished(srcAttemptIdentifier.getInputIdentifier());
@@ -1869,6 +1864,15 @@ class RssShuffleScheduler extends ShuffleScheduler {
int partitionNum = partitionToServers.size();
boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size()
> 1;
+ RssConf rssConf = RssTezConfig.toRssConf(this.conf);
+ int retryMax =
+ rssConf.getInteger(
+ RssClientConfig.RSS_CLIENT_RETRY_MAX,
+ RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+ long retryIntervalMax =
+ rssConf.getLong(
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
@@ -1885,7 +1889,9 @@ class RssShuffleScheduler extends ShuffleScheduler {
.hadoopConf(hadoopConf)
.idHelper(new TezIdHelper())
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
- .rssConf(RssTezConfig.toRssConf(conf)));
+ .retryMax(retryMax)
+ .retryIntervalMax(retryIntervalMax)
+ .rssConf(rssConf));
RssTezShuffleDataFetcher fetcher =
new RssTezShuffleDataFetcher(
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),