This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 66d3afebc [CELEBORN-494][PERF] RssInputStream fetch side support
blacklist to avoid client side timeout in same worker multiple times during
fetch
66d3afebc is described below
commit 66d3afebcd18e8b3659503d58acb37e8bcf13c30
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Jun 13 20:06:31 2023 +0800
[CELEBORN-494][PERF] RssInputStream fetch side support blacklist to avoid
client side timeout in same worker multiple times during fetch
### What changes were proposed in this pull request?
####Test case
```
executor instance 20
SQL:
SELECT count(1) from (select /*+ REPARTITION(100) */ * from
spark_auxiliary.t50g) tmp;
create connection timeout 10s
Fetch chunk timeout 30s
```
In the graph, the shuffle read time of `before` and `after` is always the
same delay time.
##### Worker can't connect
Before

After

##### OpenStream stuck
Before

After

##### Fetch chunk stuck
Before

After

### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1406 from AngersZhuuuu/CELEBORN-494.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 357add5b008d6f2d40b5fff8822e9abe1c082310)
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/client/ShuffleClientImpl.java | 7 ++-
.../celeborn/client/read/RssInputStream.java | 73 +++++++++++++++++++++-
.../common/protocol/PartitionLocation.java | 8 +++
.../org/apache/celeborn/common/CelebornConf.scala | 20 ++++++
docs/configuration/client.md | 2 +
5 files changed, 106 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b9c7110da..587aa7746 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -97,6 +97,8 @@ public class ShuffleClientImpl extends ShuffleClient {
private final boolean shuffleClientPushBlacklistEnabled;
private final Set<String> blacklist = ConcurrentHashMap.newKeySet();
+ private final ConcurrentHashMap<String, Long> fetchExcludedWorkers =
+ JavaUtils.newConcurrentHashMap();
private final ExecutorService pushDataRetryPool;
@@ -1481,7 +1483,8 @@ public class ShuffleClientImpl extends ShuffleClient {
fileGroups.mapAttempts,
attemptNumber,
startMapIndex,
- endMapIndex);
+ endMapIndex,
+ fetchExcludedWorkers);
}
}
@@ -1507,6 +1510,8 @@ public class ShuffleClientImpl extends ShuffleClient {
if (null != driverRssMetaService) {
driverRssMetaService = null;
}
+ blacklist.clear();
+ fetchExcludedWorkers.clear();
logger.warn("Shuffle client has been shutdown!");
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 874445083..98f82a01d 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -20,7 +20,9 @@ package org.apache.celeborn.client.read;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -51,7 +53,8 @@ public abstract class RssInputStream extends InputStream {
int[] attempts,
int attemptNumber,
int startMapIndex,
- int endMapIndex)
+ int endMapIndex,
+ ConcurrentHashMap<String, Long> fetchExcludedWorkers)
throws IOException {
if (locations == null || locations.length == 0) {
return emptyInputStream;
@@ -64,7 +67,8 @@ public abstract class RssInputStream extends InputStream {
attempts,
attemptNumber,
startMapIndex,
- endMapIndex);
+ endMapIndex,
+ fetchExcludedWorkers);
}
}
@@ -125,6 +129,11 @@ public abstract class RssInputStream extends InputStream {
private LongAdder skipCount = new LongAdder();
private final boolean rangeReadFilter;
+ private boolean pushReplicateEnabled;
+ private boolean fetchBlacklistEnabled;
+ private long fetchExcludedWorkerExpireTimeout;
+ private final ConcurrentHashMap<String, Long> fetchExcludedWorkers;
+
RssInputStreamImpl(
CelebornConf conf,
TransportClientFactory clientFactory,
@@ -133,7 +142,8 @@ public abstract class RssInputStream extends InputStream {
int[] attempts,
int attemptNumber,
int startMapIndex,
- int endMapIndex)
+ int endMapIndex,
+ ConcurrentHashMap<String, Long> fetchExcludedWorkers)
throws IOException {
this.conf = conf;
this.clientFactory = clientFactory;
@@ -144,6 +154,10 @@ public abstract class RssInputStream extends InputStream {
this.startMapIndex = startMapIndex;
this.endMapIndex = endMapIndex;
this.rangeReadFilter = conf.shuffleRangeReadFilterEnabled();
+ this.pushReplicateEnabled = conf.clientPushReplicateEnabled();
+ this.fetchBlacklistEnabled =
conf.clientFetchExcludeWorkerOnFailureEnabled();
+ this.fetchExcludedWorkerExpireTimeout =
conf.clientFetchExcludedWorkerExpireTimeout();
+ this.fetchExcludedWorkers = fetchExcludedWorkers;
int headerLen = Decompressor.getCompressionHeaderLength(conf);
int blockSize = conf.clientPushBufferMaxSize() + headerLen;
@@ -226,11 +240,59 @@ public abstract class RssInputStream extends InputStream {
currentChunk = getNextChunk();
}
+ private void excludeFailedLocation(PartitionLocation location, Exception
e) {
+ if (pushReplicateEnabled && fetchBlacklistEnabled && isCriticalCause(e))
{
+ fetchExcludedWorkers.put(location.hostAndFetchPort(),
System.currentTimeMillis());
+ }
+ }
+
+ private boolean isExcluded(PartitionLocation location) {
+ Long timestamp = fetchExcludedWorkers.get(location.hostAndFetchPort());
+ if (timestamp == null) {
+ return false;
+ } else if (System.currentTimeMillis() - timestamp >
fetchExcludedWorkerExpireTimeout) {
+ fetchExcludedWorkers.remove(location.hostAndFetchPort());
+ return false;
+ } else if (location.getPeer() != null) {
+ Long peerTimestamp =
fetchExcludedWorkers.get(location.getPeer().hostAndFetchPort());
+ // To avoid both replicate locations is excluded, if peer add to
excluded list earlier,
+ // change to try peer.
+ if (peerTimestamp == null || peerTimestamp < timestamp) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ private boolean isCriticalCause(Exception e) {
+ boolean isConnectTimeout =
+ e instanceof IOException
+ && e.getMessage() != null
+ && e.getMessage().startsWith("Connecting to")
+ && e.getMessage().contains("timed out");
+ boolean rpcTimeout =
+ e instanceof IOException
+ && e.getCause() != null
+ && e.getCause() instanceof TimeoutException;
+ boolean fetchChunkTimeout =
+ e instanceof CelebornIOException
+ && e.getCause() != null
+ && e.getCause() instanceof IOException;
+ return isConnectTimeout || rpcTimeout || fetchChunkTimeout;
+ }
+
private PartitionReader createReaderWithRetry(PartitionLocation location)
throws IOException {
while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
try {
+ if (isExcluded(location)) {
+ throw new CelebornIOException("Fetch data from blacklisted
location! " + location);
+ }
return createReader(location, fetchChunkRetryCnt,
fetchChunkMaxRetry);
} catch (Exception e) {
+ excludeFailedLocation(location, e);
fetchChunkRetryCnt++;
if (location.getPeer() != null) {
// fetchChunkRetryCnt % 2 == 0 means both replicas have been tried,
@@ -262,8 +324,13 @@ public abstract class RssInputStream extends InputStream {
private ByteBuf getNextChunk() throws IOException {
while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
try {
+ if (isExcluded(currentReader.getLocation())) {
+ throw new CelebornIOException(
+ "Fetch data from blacklisted location! " +
currentReader.getLocation());
+ }
return currentReader.next();
} catch (Exception e) {
+ excludeFailedLocation(currentReader.getLocation(), e);
fetchChunkRetryCnt++;
currentReader.close();
if (fetchChunkRetryCnt == fetchChunkMaxRetry) {
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
index 932589632..76486d45c 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
@@ -65,6 +65,8 @@ public class PartitionLocation implements Serializable {
private RoaringBitmap mapIdBitMap;
private transient String _hostPushPort;
+ private transient String _hostFetchPort;
+
public PartitionLocation(PartitionLocation loc) {
this.id = loc.id;
this.epoch = loc.epoch;
@@ -78,6 +80,7 @@ public class PartitionLocation implements Serializable {
this.storageInfo = loc.storageInfo;
this.mapIdBitMap = loc.mapIdBitMap;
this._hostPushPort = host + ":" + pushPort;
+ this._hostFetchPort = host + ":" + fetchPort;
}
public PartitionLocation(
@@ -151,6 +154,7 @@ public class PartitionLocation implements Serializable {
this.storageInfo = hint;
this.mapIdBitMap = mapIdBitMap;
this._hostPushPort = host + ":" + pushPort;
+ this._hostFetchPort = host + ":" + fetchPort;
}
public int getId() {
@@ -206,6 +210,10 @@ public class PartitionLocation implements Serializable {
+ replicatePort;
}
+ public String hostAndFetchPort() {
+ return _hostFetchPort;
+ }
+
public String hostAndPushPort() {
return _hostPushPort;
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a10bf4a3b..747f9dd38 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -705,6 +705,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def clientFetchMaxRetriesForEachReplica: Int =
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
+ def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
+ get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
+ def clientFetchExcludedWorkerExpireTimeout: Long =
+ get(CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
// //////////////////////////////////////////////////////
// Shuffle Client Push //
@@ -2731,6 +2735,22 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(3)
+ val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
+ .categories("client")
+ .doc("Whether to enable shuffle client-side fetch exclude workers on
failure.")
+ .version("0.3.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.client.fetch.excludedWorker.expireTimeout")
+ .categories("client")
+ .doc("ShuffleClient is a static object, it will be used in the whole
lifecycle of Executor," +
+ "We give a expire time for blacklisted worker to avoid a transient
worker issues.")
+ .version("0.3.0")
+ .fallbackConf(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
+
val TEST_CLIENT_FETCH_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.fetchFailure")
.withAlternative("celeborn.test.fetchFailure")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 13f6b633a..641585a54 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -23,6 +23,8 @@ license: |
| celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add
partition's peer worker into blacklist when push data to slave failed. | 0.3.0
|
| celeborn.client.closeIdleConnections | true | Whether client will close idle
connections. | 0.3.0 |
| celeborn.client.excludedWorker.expireTimeout | 600s | Timeout time for
LifecycleManager to clear reserved excluded worker. | 0.3.0 |
+| celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 |
+| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static
object, it will be used in the whole lifecycle of Executor,We give a expire
time for blacklisted worker to avoid a transient worker issues. | 0.3.0 |
| celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.3.0 |
| celeborn.client.fetch.maxRetriesForEachReplica | 3 | Max retry times of
fetch chunk on each replica | 0.3.0 |
| celeborn.client.fetch.timeout | 30s | Timeout for a task to fetch chunk. |
0.3.0 |