This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e84a1a6d [#1606] feat(tez): Add retry mechanism for NO_BUFFER when 
reading data(memory/local/index) (#2110)
7e84a1a6d is described below

commit 7e84a1a6d826a3c705b7896fde18e340022b4bff
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Sep 23 18:17:00 2024 +0800

    [#1606] feat(tez): Add retry mechanism for NO_BUFFER when reading 
data(memory/local/index) (#2110)
    
    ### What changes were proposed in this pull request?
    
    Add retry mechanism tez. #1606 add retry mechanism, but forgot to add for 
tez.
    
    ### Why are the changes needed?
    
    Fix: #1606
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test
---
 .../common/shuffle/impl/RssTezFetcherTask.java     | 16 ++++++++--
 .../orderedgrouped/RssShuffleScheduler.java        | 36 +++++++++++++---------
 2 files changed, 35 insertions(+), 17 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index 3052c5dbc..b0dd8df27 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -43,8 +43,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.UnitConverter;
 
@@ -186,7 +188,15 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
       Configuration hadoopConf = getRemoteConf();
       LOG.info("RssTezFetcherTask storageType:{}", storageType);
       boolean expectedTaskIdsBitmapFilterEnable = serverInfoSet.size() > 1;
-
+      RssConf rssConf = RssTezConfig.toRssConf(this.conf);
+      int retryMax =
+          rssConf.getInteger(
+              RssClientConfig.RSS_CLIENT_RETRY_MAX,
+              RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+      long retryIntervalMax =
+          rssConf.getLong(
+              RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+              RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
       ShuffleReadClient shuffleReadClient =
           ShuffleClientFactory.getInstance()
               .createShuffleReadClient(
@@ -203,7 +213,9 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
                       .hadoopConf(hadoopConf)
                       .idHelper(new TezIdHelper())
                       
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
-                      .rssConf(RssTezConfig.toRssConf(this.conf)));
+                      .retryMax(retryMax)
+                      .retryIntervalMax(retryIntervalMax)
+                      .rssConf(rssConf));
       RssTezFetcher fetcher =
           new RssTezFetcher(
               fetcherCallback,
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 3856d0f1f..5f5c2c7af 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -96,8 +96,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.UnitConverter;
@@ -283,7 +285,6 @@ class RssShuffleScheduler extends ShuffleScheduler {
   private final int partitionNumPerRange;
   private String basePath;
   private RemoteStorageInfo remoteStorageInfo;
-  private int indexReadLimit;
 
   private final int maxAttemptNo;
 
@@ -505,10 +506,8 @@ class RssShuffleScheduler extends ShuffleScheduler {
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
             
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
 
-    /**
-     * Setting to very high val can lead to Http 400 error. Cap it to 75; 
every attempt id would be
-     * approximately 48 bytes; 48 * 75 = 3600 which should give some room for 
other info in URL.
-     */
+    // Setting to very high val can lead to Http 400 error. Cap it to 75; 
every attempt id would be
+    // approximately 48 bytes; 48 * 75 = 3600 which should give some room for 
other info in URL.
     this.maxTaskOutputAtOnce =
         Math.max(
             1,
@@ -685,10 +684,8 @@ class RssShuffleScheduler extends ShuffleScheduler {
     lastEventReceived.setValue(relativeTime);
   }
 
-  /**
-   * Placeholder for tracking shuffle events in case we get multiple spills 
info for the same
-   * attempt.
-   */
+  // Placeholder for tracking shuffle events in case we get multiple spills 
info for the same
+  // attempt.
   static class ShuffleEventInfo {
     BitSet eventsProcessed;
     int finalEventId = -1; // 0 indexed
@@ -786,11 +783,9 @@ class RssShuffleScheduler extends ShuffleScheduler {
         skippedInputCounter.increment(1);
       }
 
-      /**
-       * In case of pipelined shuffle, it is quite possible that fetchers 
pulled the FINAL_UPDATE
-       * spill in advance due to smaller output size. In such scenarios, we 
need to wait until we
-       * retrieve all spill details to claim success.
-       */
+      // In case of pipelined shuffle, it is quite possible that fetchers 
pulled the FINAL_UPDATE
+      // spill in advance due to smaller output size. In such scenarios, we 
need to wait until we
+      // retrieve all spill details to claim success.
       if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
         remainingMaps.decrementAndGet();
         setInputFinished(srcAttemptIdentifier.getInputIdentifier());
@@ -1869,6 +1864,15 @@ class RssShuffleScheduler extends ShuffleScheduler {
       int partitionNum = partitionToServers.size();
       boolean expectedTaskIdsBitmapFilterEnable = shuffleServerInfoSet.size() 
> 1;
 
+      RssConf rssConf = RssTezConfig.toRssConf(this.conf);
+      int retryMax =
+          rssConf.getInteger(
+              RssClientConfig.RSS_CLIENT_RETRY_MAX,
+              RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE);
+      long retryIntervalMax =
+          rssConf.getLong(
+              RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+              RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE);
       ShuffleReadClient shuffleReadClient =
           ShuffleClientFactory.getInstance()
               .createShuffleReadClient(
@@ -1885,7 +1889,9 @@ class RssShuffleScheduler extends ShuffleScheduler {
                       .hadoopConf(hadoopConf)
                       .idHelper(new TezIdHelper())
                       
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
-                      .rssConf(RssTezConfig.toRssConf(conf)));
+                      .retryMax(retryMax)
+                      .retryIntervalMax(retryIntervalMax)
+                      .rssConf(rssConf));
       RssTezShuffleDataFetcher fetcher =
           new RssTezShuffleDataFetcher(
               
partitionIdToSuccessMapTaskAttempts.get(mapHost.getPartitionId()).iterator().next(),

Reply via email to