onebox-li opened a new pull request, #1978:
URL: https://github.com/apache/incubator-celeborn/pull/1978
### What changes were proposed in this pull request?
In our test jobs, we found few map tasks may hang at
InFlightRequestTracker#limitZeroInFlight (both
prepareForMergeData and mapEndInternal can occurs) when worker unexpected
shutdown. We add logs to trace InFlightRequestTracker#totalInflightReqs and
found this adder may become negative In the above case.
When worker suddenly shutdown, the channel connection raise exception.
If NioEventLoop.processSelectedKeys is doing read, the exceptionCaught will
be called. In TransportResponseHandler#exceptionCaught will call
failOutstandingRequests and each request‘s onFailure callback.
```
WARN [data-client-5-9] TransportChannelHandler: Exception in connection from
/xx
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at
org.apache.celeborn.shaded.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:256)
at
org.apache.celeborn.shaded.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
ERROR [data-client-5-9] ShuffleClientImpl: Push data to xx failed for
shuffle 0 map 11 attempt 0 partition 178 batch 634, remain revive times 5.
```
Next NioEventLoop start to `runAllTasks` in the finally block.If there is
push write task, PushChannelListener.handleFailure will be called because of
the closing channel. Here callback.onFailure may have a data race on
`outstandingPushes`.
```
ERROR [data-client-5-9] ShuffleClientImpl: Push data to xx failed for
shuffle 0 map 11 attempt 0 partition 178 batch 634, remain revive times 4.
org.apache.celeborn.common.exception.CelebornIOException: Failed to send
request PUSH 1264 to /xx:
org.apache.celeborn.shaded.io.netty.channel.StacklessClosedChannelException,
channel will be closed
at
org.apache.celeborn.common.network.client.TransportClient$PushChannelListener.handleFailure(TransportClient.java:382)
at
org.apache.celeborn.common.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:325)
at
org.apache.celeborn.common.network.client.TransportClient$PushChannelListener.operationComplete(TransportClient.java:373)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:999)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:860)
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:877)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at
org.apache.celeborn.shaded.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:113)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
at
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
Caused by:
org.apache.celeborn.shaded.io.netty.channel.StacklessClosedChannelException
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object,
ChannelPromise)(Unknown Source)
```
Duplicate callback.onFailure will lead to totalInflightReqs count exception.
Here race will not be too severe and only occur under exception situation.
So I think synchronize a lock is enough to avoid race.
### Why are the changes needed?
Increase robustness.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]