This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9a90ac8b6 [CELEBORN-1036] Map task hangs at limitZeroInFlight due to
duplicate onFailure called
9a90ac8b6 is described below
commit 9a90ac8b6d9d86daaacc5474ce1a6b48262f21c2
Author: onebox-li <[email protected]>
AuthorDate: Fri Oct 13 20:13:32 2023 +0800
[CELEBORN-1036] Map task hangs at limitZeroInFlight due to duplicate
onFailure called
### What changes were proposed in this pull request?
In our test jobs, we found few map tasks may hang at
InFlightRequestTracker#limitZeroInFlight (both
prepareForMergeData and mapEndInternal can occurs) when worker unexpected
shutdown. We add logs to trace InFlightRequestTracker#totalInflightReqs and
found this adder may become negative In the above case.
When worker suddenly shutdown, the channel connection raise exception.
If NioEventLoop.processSelectedKeys is doing read, the exceptionCaught will
be called. In TransportResponseHandler#exceptionCaught will call
failOutstandingRequests and each request‘s onFailure callback.
```
WARN [data-client-5-9] TransportChannelHandler: Exception in connection
from /xx
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at
org.apache.celeborn.shaded.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:256)
at
org.apache.celeborn.shaded.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
ERROR [data-client-5-9] ShuffleClientImpl: Push data to xx failed for
shuffle 0 map 11 attempt 0 partition 178 batch 634, remain revive times 5.
```
Next NioEventLoop start to `runAllTasks` in the finally block.If there is
push write task, PushChannelListener.handleFailure will be called because of
the closing channel. Here callback.onFailure may have a data race on
`outstandingPushes`.
```
ERROR [data-client-5-9] ShuffleClientImpl: Push data to xx failed for
shuffle 0 map 11 attempt 0 partition 178 batch 634, remain revive times 4.
org.apache.celeborn.common.exception.CelebornIOException: Failed to send
request PUSH 1264 to /xx:
org.apache.celeborn.shaded.io.netty.channel.StacklessClosedChannelException,
channel will be closed
at
org.apache.celeborn.common.network.client.TransportClient$PushChannelListener.handleFailure(TransportClient.java:382)
at
org.apache.celeborn.common.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:325)
at
org.apache.celeborn.common.network.client.TransportClient$PushChannelListener.operationComplete(TransportClient.java:373)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:999)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:860)
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:877)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at
org.apache.celeborn.shaded.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:113)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
Caused by:
org.apache.celeborn.shaded.io.netty.channel.StacklessClosedChannelException
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
ChannelPromise)(Unknown Source)
```
Duplicate callback.onFailure will lead to totalInflightReqs count exception.
Here race will not be too severe and only occur under exception situation.
So I think synchronize a lock is enough to avoid race.
### Why are the changes needed?
Increase robustness.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1978 from onebox-li/fix-handle-channel-failure.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../common/network/client/TransportClient.java | 29 ++----
.../network/client/TransportResponseHandler.java | 113 ++++++++++++++++-----
2 files changed, 96 insertions(+), 46 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index edf5a45ac..151895607 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -35,7 +35,6 @@ import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.network.util.NettyUtils;
@@ -132,8 +131,7 @@ public class TransportClient implements Closeable {
new StdChannelListener(streamChunkSlice) {
@Override
protected void handleFailure(String errorMsg, Throwable cause) {
- handler.removeFetchRequest(streamChunkSlice);
- callback.onFailure(chunkIndex, new IOException(errorMsg, cause));
+ handler.handleFetchFailure(streamChunkSlice, errorMsg, cause);
}
};
@@ -162,7 +160,7 @@ public class TransportClient implements Closeable {
long requestId = requestId();
handler.addRpcRequest(requestId, callback);
- RpcChannelListener listener = new RpcChannelListener(requestId, callback);
+ RpcChannelListener listener = new RpcChannelListener(requestId);
channel
.writeAndFlush(new RpcRequest(requestId, new
NioManagedBuffer(message)))
.addListener(listener);
@@ -204,7 +202,7 @@ public class TransportClient implements Closeable {
PushRequestInfo info = new PushRequestInfo(dueTime, callback);
handler.addPushRequest(requestId, info);
pushData.requestId = requestId;
- PushChannelListener listener = new PushChannelListener(requestId,
callback, rpcSendoutCallback);
+ PushChannelListener listener = new PushChannelListener(requestId,
rpcSendoutCallback);
ChannelFuture channelFuture =
channel.writeAndFlush(pushData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@@ -222,7 +220,7 @@ public class TransportClient implements Closeable {
handler.addPushRequest(requestId, info);
pushMergedData.requestId = requestId;
- PushChannelListener listener = new PushChannelListener(requestId,
callback);
+ PushChannelListener listener = new PushChannelListener(requestId);
ChannelFuture channelFuture =
channel.writeAndFlush(pushMergedData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@@ -351,35 +349,29 @@ public class TransportClient implements Closeable {
private class RpcChannelListener extends StdChannelListener {
final long rpcRequestId;
- final RpcResponseCallback callback;
- RpcChannelListener(long rpcRequestId, RpcResponseCallback callback) {
+ RpcChannelListener(long rpcRequestId) {
super("RPC " + rpcRequestId);
this.rpcRequestId = rpcRequestId;
- this.callback = callback;
}
@Override
protected void handleFailure(String errorMsg, Throwable cause) {
- handler.removeRpcRequest(rpcRequestId);
- callback.onFailure(new IOException(errorMsg, cause));
+ handler.handleRpcFailure(rpcRequestId, errorMsg, cause);
}
}
private class PushChannelListener extends StdChannelListener {
final long pushRequestId;
- final RpcResponseCallback callback;
Runnable rpcSendOutCallback;
- PushChannelListener(long pushRequestId, RpcResponseCallback callback) {
- this(pushRequestId, callback, null);
+ PushChannelListener(long pushRequestId) {
+ this(pushRequestId, null);
}
- PushChannelListener(
- long pushRequestId, RpcResponseCallback callback, Runnable
rpcSendOutCallback) {
+ PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
super("PUSH " + pushRequestId);
this.pushRequestId = pushRequestId;
- this.callback = callback;
this.rpcSendOutCallback = rpcSendOutCallback;
}
@@ -393,8 +385,7 @@ public class TransportClient implements Closeable {
@Override
protected void handleFailure(String errorMsg, Throwable cause) {
- handler.removePushRequest(pushRequestId);
- callback.onFailure(new CelebornIOException(errorMsg, cause));
+ handler.handlePushFailure(pushRequestId, errorMsg, cause);
}
}
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 09f6fe893..ddace8d87 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -54,9 +54,9 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
private final TransportConf conf;
private final Channel channel;
- private final Map<StreamChunkSlice, FetchRequestInfo> outstandingFetches;
+ private final ConcurrentHashMap<StreamChunkSlice, FetchRequestInfo>
outstandingFetches;
- private final Map<Long, RpcResponseCallback> outstandingRpcs;
+ private final ConcurrentHashMap<Long, RpcResponseCallback> outstandingRpcs;
private final ConcurrentHashMap<Long, PushRequestInfo> outstandingPushes;
/** Records the time (in system nanoseconds) that the last fetch or RPC
request was sent. */
@@ -132,7 +132,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
while (iter.hasNext()) {
Map.Entry<Long, PushRequestInfo> entry = iter.next();
if (entry.getValue().dueTime <= currentTime) {
- PushRequestInfo info = outstandingPushes.remove(entry.getKey());
+ PushRequestInfo info = removePushRequest(entry.getKey());
if (info != null) {
if (info.channelFuture != null) {
info.channelFuture.cancel(true);
@@ -158,7 +158,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
while (iter.hasNext()) {
Map.Entry<StreamChunkSlice, FetchRequestInfo> entry = iter.next();
if (entry.getValue().dueTime <= currentTime) {
- FetchRequestInfo info = outstandingFetches.remove(entry.getKey());
+ FetchRequestInfo info = removeFetchRequest(entry.getKey());
if (info != null) {
if (info.channelFuture != null) {
info.channelFuture.cancel(true);
@@ -186,8 +186,8 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
outstandingFetches.put(streamChunkSlice, info);
}
- public void removeFetchRequest(StreamChunkSlice streamChunkSlice) {
- outstandingFetches.remove(streamChunkSlice);
+ public FetchRequestInfo removeFetchRequest(StreamChunkSlice
streamChunkSlice) {
+ return outstandingFetches.remove(streamChunkSlice);
}
public void addRpcRequest(long requestId, RpcResponseCallback callback) {
@@ -198,8 +198,8 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
outstandingRpcs.put(requestId, callback);
}
- public void removeRpcRequest(long requestId) {
- outstandingRpcs.remove(requestId);
+ public RpcResponseCallback removeRpcRequest(long requestId) {
+ return outstandingRpcs.remove(requestId);
}
public void addPushRequest(long requestId, PushRequestInfo info) {
@@ -210,8 +210,8 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
outstandingPushes.put(requestId, info);
}
- public void removePushRequest(long requestId) {
- outstandingPushes.remove(requestId);
+ public PushRequestInfo removePushRequest(long requestId) {
+ return outstandingPushes.remove(requestId);
}
/**
@@ -219,32 +219,44 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
* exception or pre-mature connection termination.
*/
private void failOutstandingRequests(Throwable cause) {
- for (Map.Entry<StreamChunkSlice, FetchRequestInfo> entry :
outstandingFetches.entrySet()) {
+ Iterator<StreamChunkSlice> fetchRequestIter =
outstandingFetches.keySet().iterator();
+ while (fetchRequestIter.hasNext()) {
try {
- entry.getValue().callback.onFailure(entry.getKey().chunkIndex, cause);
+ StreamChunkSlice slice = fetchRequestIter.next();
+ FetchRequestInfo info = removeFetchRequest(slice);
+ if (info != null) {
+ info.callback.onFailure(slice.chunkIndex, cause);
+ }
} catch (Exception e) {
logger.warn("ChunkReceivedCallback.onFailure throws exception", e);
}
}
- for (Map.Entry<Long, RpcResponseCallback> entry :
outstandingRpcs.entrySet()) {
+
+ Iterator<Long> rpcRequestIter = outstandingRpcs.keySet().iterator();
+ while (rpcRequestIter.hasNext()) {
try {
- entry.getValue().onFailure(cause);
+ long requestId = rpcRequestIter.next();
+ RpcResponseCallback callback = removeRpcRequest(requestId);
+ if (callback != null) {
+ callback.onFailure(cause);
+ }
} catch (Exception e) {
logger.warn("RpcResponseCallback.onFailure throws exception", e);
}
}
- for (Map.Entry<Long, PushRequestInfo> entry :
outstandingPushes.entrySet()) {
+
+ Iterator<Long> pushRequestIter = outstandingPushes.keySet().iterator();
+ while (pushRequestIter.hasNext()) {
try {
- entry.getValue().callback.onFailure(cause);
+ long requestId = pushRequestIter.next();
+ PushRequestInfo info = removePushRequest(requestId);
+ if (info != null) {
+ info.callback.onFailure(cause);
+ }
} catch (Exception e) {
logger.warn("RpcResponseCallback.onFailure throws exception", e);
}
}
-
- // It's OK if new fetches appear, as they will fail immediately.
- outstandingFetches.clear();
- outstandingRpcs.clear();
- outstandingPushes.clear();
}
@Override
@@ -302,7 +314,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
logger.debug("Chunk {} fetch succeeded", resp.streamChunkSlice);
- FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
+ FetchRequestInfo info = removeFetchRequest(resp.streamChunkSlice);
if (info == null) {
logger.warn(
"Ignoring response for block {} from {} since it is not
outstanding",
@@ -320,7 +332,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
logger.error(
"chunk {} fetch failed, errorMessage {}", resp.streamChunkSlice,
resp.errorString);
- FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
+ FetchRequestInfo info = removeFetchRequest(resp.streamChunkSlice);
if (info == null) {
logger.warn(
"Ignoring response for block {} from {} ({}) since it is not
outstanding",
@@ -336,9 +348,9 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
}
} else if (message instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) message;
- PushRequestInfo info = outstandingPushes.remove(resp.requestId);
+ PushRequestInfo info = removePushRequest(resp.requestId);
if (info == null) {
- RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
+ RpcResponseCallback listener = removeRpcRequest(resp.requestId);
if (listener == null) {
logger.warn(
"Ignoring response for RPC {} from {} ({} bytes) since it is not
outstanding",
@@ -362,9 +374,9 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
}
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;
- PushRequestInfo info = outstandingPushes.remove(resp.requestId);
+ PushRequestInfo info = removePushRequest(resp.requestId);
if (info == null) {
- RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
+ RpcResponseCallback listener = removeRpcRequest(resp.requestId);
if (listener == null) {
logger.warn(
"Ignoring response for RPC {} from {} ({}) since it is not
outstanding",
@@ -396,4 +408,51 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
public void updateTimeOfLastRequest() {
timeOfLastRequestNs.set(System.nanoTime());
}
+
+ public void handleRpcFailure(long rpcRequestId, String errorMsg, Throwable
cause) {
+ RpcResponseCallback callback = removeRpcRequest(rpcRequestId);
+ if (callback != null) {
+ callback.onFailure(new CelebornIOException(errorMsg, cause));
+ } else {
+ logger.warn(
+ "RpcResponseCallback {} not found/already addressed when listener
handles rpc request failure",
+ rpcRequestId);
+ }
+ }
+
+ public void handlePushFailure(long pushRequestId, String errorMsg, Throwable
cause) {
+ PushRequestInfo info = removePushRequest(pushRequestId);
+ if (info != null) {
+ RpcResponseCallback callback = info.callback;
+ if (callback != null) {
+ callback.onFailure(new CelebornIOException(errorMsg, cause));
+ } else {
+ logger.warn(
+ "PushRequestInfo {} callback is null when handle push request
failure", pushRequestId);
+ }
+ } else {
+ logger.warn(
+ "PushRequestInfo {} not found/already addressed when listener
handles push request failure",
+ pushRequestId);
+ }
+ }
+
+ public void handleFetchFailure(
+ StreamChunkSlice streamChunkSlice, String errorMsg, Throwable cause) {
+ FetchRequestInfo info = removeFetchRequest(streamChunkSlice);
+ if (info != null) {
+ ChunkReceivedCallback callback = info.callback;
+ if (callback != null) {
+ callback.onFailure(streamChunkSlice.chunkIndex, new
IOException(errorMsg, cause));
+ } else {
+ logger.warn(
+ "FetchRequestInfo ({}) callback is null when listener handles
fetch request failure",
+ streamChunkSlice);
+ }
+ } else {
+ logger.warn(
+ "FetchRequestInfo ({}) not found/already addressed when listener
handles fetch request failure",
+ streamChunkSlice);
+ }
+ }
}