This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 0aa13832b [CELEBORN-676] Celeborn fetch chunk also should support
check timeout
0aa13832b is described below
commit 0aa13832b582da3ac484bcd5e00fb2ab83714dfe
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 15 13:54:09 2023 +0800
[CELEBORN-676] Celeborn fetch chunk also should support check timeout
### What changes were proposed in this pull request?
Celeborn fetch chunk also should support check timeout
#### Test case
```
executor instance 20
SQL:
SELECT count(1) from (select /*+ REPARTITION(100) */ * from
spark_auxiliary.t50g) tmp;
--conf spark.celeborn.client.spark.shuffle.writer=sort \
--conf spark.celeborn.client.fetch.excludeWorkerOnFailure.enabled=true \
--conf spark.celeborn.client.push.timeout=10s \
--conf spark.celeborn.client.push.replicate.enabled=true \
--conf spark.celeborn.client.push.revive.maxRetries=10 \
--conf spark.celeborn.client.reserveSlots.maxRetries=10 \
--conf spark.celeborn.client.registerShuffle.maxRetries=3 \
--conf spark.celeborn.client.push.blacklist.enabled=true \
--conf spark.celeborn.client.blacklistSlave.enabled=true \
--conf spark.celeborn.client.fetch.timeout=30s \
--conf spark.celeborn.client.push.data.timeout=30s \
--conf spark.celeborn.client.push.limit.inFlight.timeout=600s \
--conf spark.celeborn.client.push.maxReqsInFlight=32 \
--conf spark.celeborn.client.shuffle.compression.codec=ZSTD \
--conf spark.celeborn.rpc.askTimeout=30s \
--conf spark.celeborn.client.rpc.reserveSlots.askTimeout=30s \
--conf
spark.celeborn.client.shuffle.batchHandleChangePartition.enabled=true \
--conf
spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled=true \
--conf
spark.celeborn.client.shuffle.batchHandleReleasePartition.enabled=true
```
Test with 3 worker and add a `Thread.sleep(100s)` before worker handle
`ChunkFetchRequest`
Before patch
<img width="1783" alt="截屏2023-06-14 上午11 20 55"
src="https://github.com/apache/incubator-celeborn/assets/46485123/182dff7d-a057-4077-8368-d1552104d206">
After patch
<img width="1792" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/46485123/3c8b7933-8ace-426d-8e9f-04e0aabfac8e">
The log shows the fetch timeout checker workers
```
23/06/14 11:14:54 ERROR WorkerPartitionReader: Fetch chunk 0 failed.
org.apache.celeborn.common.exception.CelebornIOException: FETCH_DATA_TIMEOUT
at
org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:147)
at
org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$1(TransportResponseHandler.java:103)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
23/06/14 11:14:54 WARN RssInputStream: Fetch chunk failed 1/6 times for
location PartitionLocation[
id-epoch:35-0
host-rpcPort-pushPort-fetchPort-replicatePort:10.169.48.203-9092-9094-9093-9095
mode:MASTER
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.169.48.202-9092-9094-9093-9095)
storage hint:StorageInfo{type=HDD, mountPoint='/mnt/ssd/0',
finalResult=true, filePath=}
mapIdBitMap:null], change to peer
org.apache.celeborn.common.exception.CelebornIOException: Fetch chunk 0
failed.
at
org.apache.celeborn.client.read.WorkerPartitionReader$1.onFailure(WorkerPartitionReader.java:98)
at
org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:146)
at
org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$1(TransportResponseHandler.java:103)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.celeborn.common.exception.CelebornIOException:
FETCH_DATA_TIMEOUT
at
org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:147)
... 8 more
23/06/14 11:14:54 INFO SortBasedShuffleWriter: Memory used 72.0 MB
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1587 from AngersZhuuuu/CELEBORN-676.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../client/read/WorkerPartitionReader.java | 7 +-
.../common/network/client/TransportClient.java | 22 ++++--
.../network/client/TransportResponseHandler.java | 78 +++++++++++++++++-----
.../common/network/util/TransportConf.java | 8 +++
.../common/protocol/message/StatusCode.java | 4 +-
.../celeborn/common/read/FetchRequestInfo.java | 37 ++++++++++
.../org/apache/celeborn/common/CelebornConf.scala | 34 +++++++++-
.../org/apache/celeborn/common/util/Utils.scala | 2 +
.../network/TransportResponseHandlerSuiteJ.java | 14 ++--
docs/configuration/client.md | 6 +-
docs/configuration/network.md | 2 +
.../network/RequestTimeoutIntegrationSuiteJ.java | 4 +-
.../storage/ChunkFetchIntegrationSuiteJ.java | 2 +-
.../deploy/worker/storage/FileWriterSuiteJ.java | 2 +-
14 files changed, 182 insertions(+), 40 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index 1da3fd23e..e323fe6e1 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -55,6 +55,7 @@ public class WorkerPartitionReader implements PartitionReader
{
private final AtomicReference<IOException> exception = new
AtomicReference<>();
private final int fetchMaxReqsInFlight;
+ private final long fetchTimeoutMs;
private boolean closed = false;
// for test
@@ -74,6 +75,7 @@ public class WorkerPartitionReader implements PartitionReader
{
throws IOException, InterruptedException {
fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
results = new LinkedBlockingQueue<>();
+ fetchTimeoutMs = conf.clientFetchTimeoutMs();
// only add the buffer to results queue if this reader is not closed.
callback =
new ChunkReceivedCallback() {
@@ -105,8 +107,7 @@ public class WorkerPartitionReader implements
PartitionReader {
}
OpenStream openBlocks =
new OpenStream(shuffleKey, location.getFileName(), startMapIndex,
endMapIndex);
- long timeoutMs = conf.clientFetchTimeoutMs();
- ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(),
timeoutMs);
+ ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(),
fetchTimeoutMs);
streamHandle = (StreamHandle) Message.decode(response);
this.location = location;
@@ -166,7 +167,7 @@ public class WorkerPartitionReader implements
PartitionReader {
try {
TransportClient client =
clientFactory.createClient(location.getHost(),
location.getFetchPort());
- client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+ client.fetchChunk(streamHandle.streamId, chunkIndex,
fetchTimeoutMs, callback);
chunkIndex++;
} catch (IOException e) {
logger.error(
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 4bf6b93d7..699f1ac78 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -39,6 +39,7 @@ import
org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.network.util.NettyUtils;
+import org.apache.celeborn.common.read.FetchRequestInfo;
import org.apache.celeborn.common.write.PushRequestInfo;
/**
@@ -90,8 +91,9 @@ public class TransportClient implements Closeable {
return channel.remoteAddress();
}
- public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback
callback) {
- fetchChunk(streamId, chunkIndex, 0, Integer.MAX_VALUE, callback);
+ public void fetchChunk(
+ long streamId, int chunkIndex, long fetchDataTimeout,
ChunkReceivedCallback callback) {
+ fetchChunk(streamId, chunkIndex, 0, Integer.MAX_VALUE, fetchDataTimeout,
callback);
}
/**
@@ -112,7 +114,12 @@ public class TransportClient implements Closeable {
* @param callback Callback invoked upon successful receipt of chunk, or
upon any failure.
*/
public void fetchChunk(
- long streamId, int chunkIndex, int offset, int len,
ChunkReceivedCallback callback) {
+ long streamId,
+ int chunkIndex,
+ int offset,
+ int len,
+ long fetchDataTimeout,
+ ChunkReceivedCallback callback) {
if (logger.isDebugEnabled()) {
logger.debug(
"Sending fetch chunk request {} to {}.",
@@ -129,9 +136,14 @@ public class TransportClient implements Closeable {
callback.onFailure(chunkIndex, new IOException(errorMsg, cause));
}
};
- handler.addFetchRequest(streamChunkSlice, callback);
- channel.writeAndFlush(new
ChunkFetchRequest(streamChunkSlice)).addListener(listener);
+ long dueTime = System.currentTimeMillis() + fetchDataTimeout;
+ FetchRequestInfo info = new FetchRequestInfo(dueTime, callback);
+ handler.addFetchRequest(streamChunkSlice, info);
+
+ ChannelFuture channelFuture =
+ channel.writeAndFlush(new
ChunkFetchRequest(streamChunkSlice)).addListener(listener);
+ info.setChannelFuture(channelFuture);
}
/**
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 46f487a2c..7a2f8e61e 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -37,6 +37,7 @@ import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.protocol.message.StatusCode;
+import org.apache.celeborn.common.read.FetchRequestInfo;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.write.PushRequestInfo;
@@ -53,7 +54,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
private final TransportConf conf;
private final Channel channel;
- private final Map<StreamChunkSlice, ChunkReceivedCallback>
outstandingFetches;
+ private final Map<StreamChunkSlice, FetchRequestInfo> outstandingFetches;
private final Map<Long, RpcResponseCallback> outstandingRpcs;
private final ConcurrentHashMap<Long, PushRequestInfo> outstandingPushes;
@@ -63,7 +64,11 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
private final long pushTimeoutCheckerInterval;
private static ScheduledExecutorService pushTimeoutChecker = null;
- private ScheduledFuture scheduleFuture;
+ private ScheduledFuture pushCheckerScheduleFuture;
+
+ private final long fetchTimeoutCheckerInterval;
+ private static ScheduledExecutorService fetchTimeoutChecker = null;
+ private ScheduledFuture fetchCheckerScheduleFuture;
public TransportResponseHandler(TransportConf conf, Channel channel) {
this.conf = conf;
@@ -72,20 +77,33 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
this.outstandingRpcs = JavaUtils.newConcurrentHashMap();
this.outstandingPushes = JavaUtils.newConcurrentHashMap();
this.timeOfLastRequestNs = new AtomicLong(0);
- pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+ this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+ this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
synchronized (TransportResponseHandler.class) {
if (pushTimeoutChecker == null) {
pushTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
}
+ if (fetchTimeoutChecker == null) {
+ fetchTimeoutChecker =
+ ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+ "fetch-timeout-checker",
conf.fetchDataTimeoutCheckerThreads());
+ }
}
- scheduleFuture =
+ pushCheckerScheduleFuture =
pushTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredPushRequest(),
pushTimeoutCheckerInterval,
pushTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
+
+ fetchCheckerScheduleFuture =
+ fetchTimeoutChecker.scheduleAtFixedRate(
+ () -> failExpiredFetchRequest(),
+ fetchTimeoutCheckerInterval,
+ fetchTimeoutCheckerInterval,
+ TimeUnit.MILLISECONDS);
}
public void failExpiredPushRequest() {
@@ -113,12 +131,33 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
}
}
- public void addFetchRequest(StreamChunkSlice streamChunkSlice,
ChunkReceivedCallback callback) {
+ public void failExpiredFetchRequest() {
+ long currentTime = System.currentTimeMillis();
+ Iterator<Map.Entry<StreamChunkSlice, FetchRequestInfo>> iter =
+ outstandingFetches.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<StreamChunkSlice, FetchRequestInfo> entry = iter.next();
+ if (entry.getValue().dueTime <= currentTime) {
+ FetchRequestInfo info = outstandingFetches.remove(entry.getKey());
+ if (info != null) {
+ if (info.channelFuture != null) {
+ info.channelFuture.cancel(true);
+ }
+ info.callback.onFailure(
+ entry.getKey().chunkIndex, new
CelebornIOException(StatusCode.FETCH_DATA_TIMEOUT));
+ info.channelFuture = null;
+ info.callback = null;
+ }
+ }
+ }
+ }
+
+ public void addFetchRequest(StreamChunkSlice streamChunkSlice,
FetchRequestInfo info) {
updateTimeOfLastRequest();
if (outstandingFetches.containsKey(streamChunkSlice)) {
logger.warn("[addFetchRequest] streamChunkSlice {} already exists!",
streamChunkSlice);
}
- outstandingFetches.put(streamChunkSlice, callback);
+ outstandingFetches.put(streamChunkSlice, info);
}
public void removeFetchRequest(StreamChunkSlice streamChunkSlice) {
@@ -154,9 +193,9 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
* exception or pre-mature connection termination.
*/
private void failOutstandingRequests(Throwable cause) {
- for (Map.Entry<StreamChunkSlice, ChunkReceivedCallback> entry :
outstandingFetches.entrySet()) {
+ for (Map.Entry<StreamChunkSlice, FetchRequestInfo> entry :
outstandingFetches.entrySet()) {
try {
- entry.getValue().onFailure(entry.getKey().chunkIndex, cause);
+ entry.getValue().callback.onFailure(entry.getKey().chunkIndex, cause);
} catch (Exception e) {
logger.warn("ChunkReceivedCallback.onFailure throws exception", e);
}
@@ -195,7 +234,8 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
remoteAddress);
failOutstandingRequests(new IOException("Connection from " +
remoteAddress + " closed"));
}
- scheduleFuture.cancel(false);
+ pushCheckerScheduleFuture.cancel(false);
+ fetchCheckerScheduleFuture.cancel(false);
}
@Override
@@ -208,28 +248,32 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
remoteAddress);
failOutstandingRequests(cause);
}
- scheduleFuture.cancel(false);
+ pushCheckerScheduleFuture.cancel(false);
+ fetchCheckerScheduleFuture.cancel(false);
}
@Override
public void handle(ResponseMessage message) throws Exception {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
- ChunkReceivedCallback listener =
outstandingFetches.remove(resp.streamChunkSlice);
- if (listener == null) {
+ FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
+ if (info == null) {
logger.warn(
"Ignoring response for block {} from {} since it is not
outstanding",
resp.streamChunkSlice,
NettyUtils.getRemoteAddress(channel));
resp.body().release();
} else {
- listener.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body());
- resp.body().release();
+ try {
+ info.callback.onSuccess(resp.streamChunkSlice.chunkIndex,
resp.body());
+ } finally {
+ resp.body().release();
+ }
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
- ChunkReceivedCallback listener =
outstandingFetches.remove(resp.streamChunkSlice);
- if (listener == null) {
+ FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
+ if (info == null) {
logger.warn(
"Ignoring response for block {} from {} ({}) since it is not
outstanding",
resp.streamChunkSlice,
@@ -237,7 +281,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
resp.errorString);
} else {
logger.warn("Receive ChunkFetchFailure, errorMsg {}",
resp.errorString);
- listener.onFailure(
+ info.callback.onFailure(
resp.streamChunkSlice.chunkIndex,
new ChunkFetchFailureException(
"Failure while fetching " + resp.streamChunkSlice + ": " +
resp.errorString));
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
index 29d7e5dc6..aff2dfa71 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
@@ -142,6 +142,14 @@ public class TransportConf {
return celebornConf.pushDataTimeoutCheckInterval(module);
}
+ public int fetchDataTimeoutCheckerThreads() {
+ return celebornConf.fetchDataTimeoutCheckerThreads(module);
+ }
+
+ public long fetchDataTimeoutCheckIntervalMs() {
+ return celebornConf.fetchDataTimeoutCheckInterval(module);
+ }
+
public long clientHearbeatInterval() {
return celebornConf.clientHeartbeatInterval(module);
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
index 670d1efea..688f0def8 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java
@@ -75,7 +75,9 @@ public enum StatusCode {
PUSH_DATA_TIMEOUT_MASTER(42),
PUSH_DATA_TIMEOUT_SLAVE(43),
PUSH_DATA_MASTER_BLACKLISTED(44),
- PUSH_DATA_SLAVE_BLACKLISTED(45);
+ PUSH_DATA_SLAVE_BLACKLISTED(45),
+
+ FETCH_DATA_TIMEOUT(46);
private final byte value;
diff --git
a/common/src/main/java/org/apache/celeborn/common/read/FetchRequestInfo.java
b/common/src/main/java/org/apache/celeborn/common/read/FetchRequestInfo.java
new file mode 100644
index 000000000..4e066ec0b
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/read/FetchRequestInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.read;
+
+import io.netty.channel.ChannelFuture;
+
+import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
+
+public class FetchRequestInfo {
+ public ChannelFuture channelFuture;
+ public long dueTime;
+ public ChunkReceivedCallback callback;
+
+ public FetchRequestInfo(long dueTime, ChunkReceivedCallback callback) {
+ this.dueTime = dueTime;
+ this.callback = callback;
+ }
+
+ public void setChannelFuture(ChannelFuture future) {
+ this.channelFuture = future;
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 68ef91df1..6714b29f2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -482,6 +482,16 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
getTimeAsMs(key, PUSH_TIMEOUT_CHECK_INTERVAL.defaultValueString)
}
+ def fetchDataTimeoutCheckerThreads(module: String): Int = {
+ val key = FETCH_TIMEOUT_CHECK_THREADS.key.replace("<module>", module)
+ getInt(key, FETCH_TIMEOUT_CHECK_THREADS.defaultValue.get)
+ }
+
+ def fetchDataTimeoutCheckInterval(module: String): Long = {
+ val key = FETCH_TIMEOUT_CHECK_INTERVAL.key.replace("<module>", module)
+ getTimeAsMs(key, FETCH_TIMEOUT_CHECK_INTERVAL.defaultValueString)
+ }
+
// //////////////////////////////////////////////////////
// Master //
// //////////////////////////////////////////////////////
@@ -1385,6 +1395,26 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(16)
+ val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
+ .categories("network")
+ .doc("Interval for checking fetch data timeout. " +
+ s"It only support setting <module> to
`${TransportModuleConstants.DATA_MODULE}` " +
+ s"since it works for shuffle client fetch data and should be
configured on client side.")
+ .version("0.3.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("5s")
+
+ val FETCH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.<module>.fetch.timeoutCheck.threads")
+ .categories("network")
+ .doc("Threads num for checking fetch data timeout. " +
+ s"It only support setting <module> to
`${TransportModuleConstants.DATA_MODULE}` " +
+ s"since it works for shuffle client fetch data and should be
configured on client side.")
+ .version("0.3.0")
+ .intConf
+ .createWithDefault(16)
+
val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.heartbeat.interval")
.withAlternative("celeborn.client.heartbeat.interval")
@@ -2608,7 +2638,7 @@ object CelebornConf extends Logging {
.createWithDefaultString("2s")
val CLIENT_PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.client.push.data.timeout")
+ buildConf("celeborn.client.push.timeout")
.withAlternative("celeborn.push.data.timeout")
.categories("client")
.version("0.3.0")
@@ -2715,7 +2745,7 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.fetch.timeout")
.categories("client")
.version("0.3.0")
- .doc("Timeout for a task to fetch chunk.")
+ .doc("Timeout for a task to open stream and fetch chunk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 158d4287c..dd835c35b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -903,6 +903,8 @@ object Utils extends Logging {
StatusCode.PUSH_DATA_MASTER_BLACKLISTED
case 45 =>
StatusCode.PUSH_DATA_SLAVE_BLACKLISTED
+ case 46 =>
+ StatusCode.FETCH_DATA_TIMEOUT
case _ =>
null
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
index c71411c36..3dd32334f 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
@@ -33,6 +33,7 @@ import
org.apache.celeborn.common.network.client.RpcResponseCallback;
import org.apache.celeborn.common.network.client.TransportResponseHandler;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
+import org.apache.celeborn.common.read.FetchRequestInfo;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushRequestInfo;
@@ -46,7 +47,8 @@ public class TransportResponseHandlerSuiteJ {
Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.FETCH_MODULE, 8),
new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- handler.addFetchRequest(streamChunkSlice, callback);
+ FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
+ handler.addFetchRequest(streamChunkSlice, info);
assertEquals(1, handler.numOutstandingRequests());
handler.handle(new ChunkFetchSuccess(streamChunkSlice, new
TestManagedBuffer(123)));
@@ -62,7 +64,8 @@ public class TransportResponseHandlerSuiteJ {
Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.FETCH_MODULE, 8),
new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- handler.addFetchRequest(streamChunkSlice, callback);
+ FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
+ handler.addFetchRequest(streamChunkSlice, info);
assertEquals(1, handler.numOutstandingRequests());
handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg"));
@@ -77,9 +80,10 @@ public class TransportResponseHandlerSuiteJ {
Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- handler.addFetchRequest(new StreamChunkSlice(1, 0), callback);
- handler.addFetchRequest(new StreamChunkSlice(1, 1), callback);
- handler.addFetchRequest(new StreamChunkSlice(1, 2), callback);
+ FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
+ handler.addFetchRequest(new StreamChunkSlice(1, 0), info);
+ handler.addFetchRequest(new StreamChunkSlice(1, 1), info);
+ handler.addFetchRequest(new StreamChunkSlice(1, 2), info);
assertEquals(3, handler.numOutstandingRequests());
handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new
TestManagedBuffer(12)));
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 6ddae63ff..cbd1bb7a5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -28,7 +28,7 @@ license: |
| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static
object, it will be used in the whole lifecycle of Executor,We give a expire
time for blacklisted worker to avoid a transient worker issues. | 0.3.0 |
| celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.3.0 |
| celeborn.client.fetch.maxRetriesForEachReplica | 3 | Max retry times of
fetch chunk on each replica | 0.3.0 |
-| celeborn.client.fetch.timeout | 30s | Timeout for a task to fetch chunk. |
0.3.0 |
+| celeborn.client.fetch.timeout | 30s | Timeout for a task to open stream and
fetch chunk. | 0.3.0 |
| celeborn.client.flink.compression.enabled | true | Whether to compress data
in Flink plugin. | 0.3.0 |
| celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | Max
concurrent reading channels for a input gate. | 0.3.0 |
| celeborn.client.flink.inputGate.memory | 32m | Memory reserved for a input
gate. | 0.3.0 |
@@ -40,9 +40,8 @@ license: |
| celeborn.client.push.blacklist.enabled | false | Whether to enable shuffle
client-side push blacklist of workers. | 0.3.0 |
| celeborn.client.push.buffer.initial.size | 8k | | 0.3.0 |
| celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition
buffer memory for shuffle hash writer. The pushed data will be buffered in
memory before sending to Celeborn worker. For performance consideration keep
this buffer size higher than 32K. Example: If reducer amount is 2000, buffer
size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap
memory. | 0.3.0 |
-| celeborn.client.push.data.timeout | 120s | Timeout for a task to push data
rpc message. This value should better be more than twice of
`celeborn.<module>.push.timeoutCheck.interval` | 0.3.0 |
| celeborn.client.push.limit.inFlight.sleepInterval | 50ms | Sleep interval
when check netty in-flight requests to be done. | 0.3.0 |
-| celeborn.client.push.limit.inFlight.timeout | <undefined> | Timeout
for netty in-flight requests to be done.Default value should be
`celeborn.client.push.data.timeout * 2`. | 0.3.0 |
+| celeborn.client.push.limit.inFlight.timeout | <undefined> | Timeout
for netty in-flight requests to be done.Default value should be
`celeborn.client.push.timeout * 2`. | 0.3.0 |
| celeborn.client.push.limit.strategy | SIMPLE | The strategy used to control
the push speed. Valid strategies are SIMPLE and SLOWSTART. the SLOWSTART
strategy is usually cooperate with congest control mechanism in the worker
side. | 0.3.0 |
| celeborn.client.push.maxReqsInFlight | 4 | Amount of Netty in-flight
requests per worker. The maximum memory is `celeborn.push.maxReqsInFlight` *
`celeborn.push.buffer.max.size` * compression ratio(1 in worst case), default:
64Kib * 32 = 2Mib | 0.3.0 |
| celeborn.client.push.queue.capacity | 512 | Push buffer queue size for a
task. The maximum memory is `celeborn.push.buffer.max.size` *
`celeborn.push.queue.capacity`, default: 64KiB * 512 = 32MiB | 0.3.0 |
@@ -56,6 +55,7 @@ license: |
| celeborn.client.push.stageEnd.timeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for waiting
StageEnd. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing filesand 1 times for releasing slots request. User can customize
this value according to your setting. By default, the value is the max timeout
value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
| celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task
available to push to worker. | 0.3.0 |
| celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task
available to push to worker. | 0.3.0 |
+| celeborn.client.push.timeout | 120s | Timeout for a task to push data rpc
message. This value should better be more than twice of
`celeborn.<module>.push.timeoutCheck.interval` | 0.3.0 |
| celeborn.client.registerShuffle.maxRetries | 3 | Max retry times for client
to register shuffle. | 0.3.0 |
| celeborn.client.registerShuffle.retryWait | 3s | Wait time before next retry
if register shuffle failed. | 0.3.0 |
| celeborn.client.requestCommitFiles.maxRetries | 2 | Max retry times for
requestCommitFiles RPC. | 0.3.0 |
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 1718dd512..28dc7fd01 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -19,6 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
+| celeborn.<module>.fetch.timeoutCheck.interval | 5s | Interval for
checking fetch data timeout. It only support setting <module> to `data` since
it works for shuffle client fetch data and should be configured on client side.
| 0.3.0 |
+| celeborn.<module>.fetch.timeoutCheck.threads | 16 | Threads num for
checking fetch data timeout. It only support setting <module> to `data` since
it works for shuffle client fetch data and should be configured on client side.
| 0.3.0 |
| celeborn.<module>.heartbeat.interval | 60s | The heartbeat interval
between worker and client. If setting <module> to `data`, it works for shuffle
client push and fetch data and should be configured on client side. If setting
<module> to `replicate`, it works for worker replicate data to peer worker and
should be configured on worker side. | 0.3.0 |
| celeborn.<module>.io.backLog | 0 | Requested maximum length of the
queue of incoming connections. Default 0 for no backlog. | |
| celeborn.<module>.io.clientThreads | 0 | Number of threads used in the
client thread pool. Default to 0, which is 2x#cores. | |
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java
index 56f7e49e5..11223a0d3 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java
@@ -215,12 +215,12 @@ public class RequestTimeoutIntegrationSuiteJ {
// Send one request, which will eventually fail.
TestCallback callback0 = new TestCallback();
- client.fetchChunk(0, 0, callback0);
+ client.fetchChunk(0, 0, 10000, callback0);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
// Send a second request before the first has failed.
TestCallback callback1 = new TestCallback();
- client.fetchChunk(0, 1, callback1);
+ client.fetchChunk(0, 1, 10000, callback1);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
// not complete yet, but should complete soon
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java
index a6dd0a162..47c0077c0 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java
@@ -170,7 +170,7 @@ public class ChunkFetchIntegrationSuiteJ {
};
for (int chunkIndex : chunkIndices) {
- client.fetchChunk(STREAM_ID, chunkIndex, callback);
+ client.fetchChunk(STREAM_ID, chunkIndex, 10000, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index e1b8cacf7..6a4ec4774 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -232,7 +232,7 @@ public class FileWriterSuiteJ {
};
for (int chunkIndex : chunkIndices) {
- client.fetchChunk(streamId, chunkIndex, callback);
+ client.fetchChunk(streamId, chunkIndex, 10000, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");