This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 66d3afebc [CELEBORN-494][PERF] RssInputStream fetch side support 
blacklist to avoid client side timeout in same worker multiple times during 
fetch
66d3afebc is described below

commit 66d3afebcd18e8b3659503d58acb37e8bcf13c30
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Jun 13 20:06:31 2023 +0800

    [CELEBORN-494][PERF] RssInputStream fetch side support blacklist to avoid 
client side timeout in same worker multiple times during fetch
    
    ### What changes were proposed in this pull request?
    ####Test case
    ```
    executor instance 20
    
    SQL:
    SELECT count(1) from (select /*+ REPARTITION(100) */ * from 
spark_auxiliary.t50g) tmp;
    
    create connection timeout 10s
    
    Fetch chunk timeout 30s
    ```
    In the graph, the shuffle read time of `before` and `after` is always the 
same delay time.
    
    ##### Worker can't connect
    Before
    
![image](https://user-images.githubusercontent.com/46485123/229465520-9d751b40-2b8f-49d2-b350-a2278e3dd89e.png)
    
    After
    
![image](https://user-images.githubusercontent.com/46485123/229465552-88ac1ca4-24ad-4c30-9a46-0cdcae6bbfd5.png)
    
    ##### OpenStream stuck
    Before
    
![image](https://user-images.githubusercontent.com/46485123/229465629-68765a6a-2503-4018-8917-d49e47d5dccc.png)
    
    After
    
![image](https://user-images.githubusercontent.com/46485123/229465683-2f57b374-1c66-4819-93dd-cabee7ccb788.png)
    
    ##### Fetch chunk stuck
    Before
    
![image](https://user-images.githubusercontent.com/46485123/229465735-8d2f694b-1b4a-4984-b069-c4a308f41008.png)
    
    After
    
![image](https://user-images.githubusercontent.com/46485123/229465754-c2237d5a-6fb6-4d5b-819e-b7d86a1e88d7.png)
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1406 from AngersZhuuuu/CELEBORN-494.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 357add5b008d6f2d40b5fff8822e9abe1c082310)
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/client/ShuffleClientImpl.java  |  7 ++-
 .../celeborn/client/read/RssInputStream.java       | 73 +++++++++++++++++++++-
 .../common/protocol/PartitionLocation.java         |  8 +++
 .../org/apache/celeborn/common/CelebornConf.scala  | 20 ++++++
 docs/configuration/client.md                       |  2 +
 5 files changed, 106 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b9c7110da..587aa7746 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -97,6 +97,8 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   private final boolean shuffleClientPushBlacklistEnabled;
   private final Set<String> blacklist = ConcurrentHashMap.newKeySet();
+  private final ConcurrentHashMap<String, Long> fetchExcludedWorkers =
+      JavaUtils.newConcurrentHashMap();
 
   private final ExecutorService pushDataRetryPool;
 
@@ -1481,7 +1483,8 @@ public class ShuffleClientImpl extends ShuffleClient {
           fileGroups.mapAttempts,
           attemptNumber,
           startMapIndex,
-          endMapIndex);
+          endMapIndex,
+          fetchExcludedWorkers);
     }
   }
 
@@ -1507,6 +1510,8 @@ public class ShuffleClientImpl extends ShuffleClient {
     if (null != driverRssMetaService) {
       driverRssMetaService = null;
     }
+    blacklist.clear();
+    fetchExcludedWorkers.clear();
     logger.warn("Shuffle client has been shutdown!");
   }
 
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 874445083..98f82a01d 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -20,7 +20,9 @@ package org.apache.celeborn.client.read;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.LongAdder;
 
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -51,7 +53,8 @@ public abstract class RssInputStream extends InputStream {
       int[] attempts,
       int attemptNumber,
       int startMapIndex,
-      int endMapIndex)
+      int endMapIndex,
+      ConcurrentHashMap<String, Long> fetchExcludedWorkers)
       throws IOException {
     if (locations == null || locations.length == 0) {
       return emptyInputStream;
@@ -64,7 +67,8 @@ public abstract class RssInputStream extends InputStream {
           attempts,
           attemptNumber,
           startMapIndex,
-          endMapIndex);
+          endMapIndex,
+          fetchExcludedWorkers);
     }
   }
 
@@ -125,6 +129,11 @@ public abstract class RssInputStream extends InputStream {
     private LongAdder skipCount = new LongAdder();
     private final boolean rangeReadFilter;
 
+    private boolean pushReplicateEnabled;
+    private boolean fetchBlacklistEnabled;
+    private long fetchExcludedWorkerExpireTimeout;
+    private final ConcurrentHashMap<String, Long> fetchExcludedWorkers;
+
     RssInputStreamImpl(
         CelebornConf conf,
         TransportClientFactory clientFactory,
@@ -133,7 +142,8 @@ public abstract class RssInputStream extends InputStream {
         int[] attempts,
         int attemptNumber,
         int startMapIndex,
-        int endMapIndex)
+        int endMapIndex,
+        ConcurrentHashMap<String, Long> fetchExcludedWorkers)
         throws IOException {
       this.conf = conf;
       this.clientFactory = clientFactory;
@@ -144,6 +154,10 @@ public abstract class RssInputStream extends InputStream {
       this.startMapIndex = startMapIndex;
       this.endMapIndex = endMapIndex;
       this.rangeReadFilter = conf.shuffleRangeReadFilterEnabled();
+      this.pushReplicateEnabled = conf.clientPushReplicateEnabled();
+      this.fetchBlacklistEnabled = 
conf.clientFetchExcludeWorkerOnFailureEnabled();
+      this.fetchExcludedWorkerExpireTimeout = 
conf.clientFetchExcludedWorkerExpireTimeout();
+      this.fetchExcludedWorkers = fetchExcludedWorkers;
 
       int headerLen = Decompressor.getCompressionHeaderLength(conf);
       int blockSize = conf.clientPushBufferMaxSize() + headerLen;
@@ -226,11 +240,59 @@ public abstract class RssInputStream extends InputStream {
       currentChunk = getNextChunk();
     }
 
+    private void excludeFailedLocation(PartitionLocation location, Exception 
e) {
+      if (pushReplicateEnabled && fetchBlacklistEnabled && isCriticalCause(e)) 
{
+        fetchExcludedWorkers.put(location.hostAndFetchPort(), 
System.currentTimeMillis());
+      }
+    }
+
+    private boolean isExcluded(PartitionLocation location) {
+      Long timestamp = fetchExcludedWorkers.get(location.hostAndFetchPort());
+      if (timestamp == null) {
+        return false;
+      } else if (System.currentTimeMillis() - timestamp > 
fetchExcludedWorkerExpireTimeout) {
+        fetchExcludedWorkers.remove(location.hostAndFetchPort());
+        return false;
+      } else if (location.getPeer() != null) {
+        Long peerTimestamp = 
fetchExcludedWorkers.get(location.getPeer().hostAndFetchPort());
+        // To avoid both replicate locations is excluded, if peer add to 
excluded list earlier,
+        // change to try peer.
+        if (peerTimestamp == null || peerTimestamp < timestamp) {
+          return true;
+        } else {
+          return false;
+        }
+      } else {
+        return true;
+      }
+    }
+
+    private boolean isCriticalCause(Exception e) {
+      boolean isConnectTimeout =
+          e instanceof IOException
+              && e.getMessage() != null
+              && e.getMessage().startsWith("Connecting to")
+              && e.getMessage().contains("timed out");
+      boolean rpcTimeout =
+          e instanceof IOException
+              && e.getCause() != null
+              && e.getCause() instanceof TimeoutException;
+      boolean fetchChunkTimeout =
+          e instanceof CelebornIOException
+              && e.getCause() != null
+              && e.getCause() instanceof IOException;
+      return isConnectTimeout || rpcTimeout || fetchChunkTimeout;
+    }
+
     private PartitionReader createReaderWithRetry(PartitionLocation location) 
throws IOException {
       while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
         try {
+          if (isExcluded(location)) {
+            throw new CelebornIOException("Fetch data from blacklisted 
location! " + location);
+          }
           return createReader(location, fetchChunkRetryCnt, 
fetchChunkMaxRetry);
         } catch (Exception e) {
+          excludeFailedLocation(location, e);
           fetchChunkRetryCnt++;
           if (location.getPeer() != null) {
             // fetchChunkRetryCnt % 2 == 0 means both replicas have been tried,
@@ -262,8 +324,13 @@ public abstract class RssInputStream extends InputStream {
     private ByteBuf getNextChunk() throws IOException {
       while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
         try {
+          if (isExcluded(currentReader.getLocation())) {
+            throw new CelebornIOException(
+                "Fetch data from blacklisted location! " + 
currentReader.getLocation());
+          }
           return currentReader.next();
         } catch (Exception e) {
+          excludeFailedLocation(currentReader.getLocation(), e);
           fetchChunkRetryCnt++;
           currentReader.close();
           if (fetchChunkRetryCnt == fetchChunkMaxRetry) {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
 
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
index 932589632..76486d45c 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
@@ -65,6 +65,8 @@ public class PartitionLocation implements Serializable {
   private RoaringBitmap mapIdBitMap;
   private transient String _hostPushPort;
 
+  private transient String _hostFetchPort;
+
   public PartitionLocation(PartitionLocation loc) {
     this.id = loc.id;
     this.epoch = loc.epoch;
@@ -78,6 +80,7 @@ public class PartitionLocation implements Serializable {
     this.storageInfo = loc.storageInfo;
     this.mapIdBitMap = loc.mapIdBitMap;
     this._hostPushPort = host + ":" + pushPort;
+    this._hostFetchPort = host + ":" + fetchPort;
   }
 
   public PartitionLocation(
@@ -151,6 +154,7 @@ public class PartitionLocation implements Serializable {
     this.storageInfo = hint;
     this.mapIdBitMap = mapIdBitMap;
     this._hostPushPort = host + ":" + pushPort;
+    this._hostFetchPort = host + ":" + fetchPort;
   }
 
   public int getId() {
@@ -206,6 +210,10 @@ public class PartitionLocation implements Serializable {
         + replicatePort;
   }
 
+  public String hostAndFetchPort() {
+    return _hostFetchPort;
+  }
+
   public String hostAndPushPort() {
     return _hostPushPort;
   }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a10bf4a3b..747f9dd38 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -705,6 +705,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
   def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
   def clientFetchMaxRetriesForEachReplica: Int = 
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
+  def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
+    get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
+  def clientFetchExcludedWorkerExpireTimeout: Long =
+    get(CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
 
   // //////////////////////////////////////////////////////
   //               Shuffle Client Push                   //
@@ -2731,6 +2735,22 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(3)
 
+  val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
+      .categories("client")
+      .doc("Whether to enable shuffle client-side fetch exclude workers on 
failure.")
+      .version("0.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.client.fetch.excludedWorker.expireTimeout")
+      .categories("client")
+      .doc("ShuffleClient is a static object, it will be used in the whole 
lifecycle of Executor," +
+        "We give a expire time for blacklisted worker to avoid a transient 
worker issues.")
+      .version("0.3.0")
+      .fallbackConf(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
+
   val TEST_CLIENT_FETCH_FAILURE: ConfigEntry[Boolean] =
     buildConf("celeborn.test.client.fetchFailure")
       .withAlternative("celeborn.test.fetchFailure")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 13f6b633a..641585a54 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -23,6 +23,8 @@ license: |
 | celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add 
partition's peer worker into blacklist when push data to slave failed. | 0.3.0 
| 
 | celeborn.client.closeIdleConnections | true | Whether client will close idle 
connections. | 0.3.0 | 
 | celeborn.client.excludedWorker.expireTimeout | 600s | Timeout time for 
LifecycleManager to clear reserved excluded worker. | 0.3.0 | 
+| celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to 
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 | 
+| celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of 
celeborn.client.excludedWorker.expireTimeout&gt; | ShuffleClient is a static 
object, it will be used in the whole lifecycle of Executor,We give a expire 
time for blacklisted worker to avoid a transient worker issues. | 0.3.0 | 
 | celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch 
request. | 0.3.0 | 
 | celeborn.client.fetch.maxRetriesForEachReplica | 3 | Max retry times of 
fetch chunk on each replica | 0.3.0 | 
 | celeborn.client.fetch.timeout | 30s | Timeout for a task to fetch chunk. | 
0.3.0 | 

Reply via email to