This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6b86b58f9 [CELEBORN-1162][BUG] Fix refCnt 0 Exception in
FetchHandler#handleChunkFetchRequest
6b86b58f9 is described below
commit 6b86b58f9e216fbf4bb53150bc76effc985c4a1d
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Dec 12 16:40:08 2023 +0800
[CELEBORN-1162][BUG] Fix refCnt 0 Exception in
FetchHandler#handleChunkFetchRequest
### What changes were proposed in this pull request?
When I'm testing main branch I encountered exception below:
```
23/12/12 16:03:03,262 WARN [fetch-server-11-52] DefaultPromise: An
exception was thrown by
org.apache.celeborn.service.deploy.worker.FetchHandler$$anon$2.operationComplete()
io.netty.util.IllegalReferenceCountException: refCnt: 0
at
io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
at
io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383)
at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:433)
at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:208)
at
io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1342)
at java.util.WeakHashMap.hash(WeakHashMap.java:298)
at java.util.WeakHashMap.getEntry(WeakHashMap.java:426)
at java.util.WeakHashMap.containsKey(WeakHashMap.java:417)
at
org.apache.commons.lang3.builder.ToStringStyle.isRegistered(ToStringStyle.java:207)
at
org.apache.commons.lang3.builder.ToStringStyle.appendInternal(ToStringStyle.java:492)
at
org.apache.commons.lang3.builder.ToStringStyle.append(ToStringStyle.java:466)
at
org.apache.commons.lang3.builder.ToStringBuilder.append(ToStringBuilder.java:845)
at
org.apache.celeborn.common.network.buffer.NettyManagedBuffer.toString(NettyManagedBuffer.java:82)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuffer.append(StringBuffer.java:269)
at
org.apache.commons.lang3.builder.ToStringStyle.appendDetail(ToStringStyle.java:614)
at
org.apache.commons.lang3.builder.ToStringStyle.appendInternal(ToStringStyle.java:579)
at
org.apache.commons.lang3.builder.ToStringStyle.append(ToStringStyle.java:466)
at
org.apache.commons.lang3.builder.ToStringBuilder.append(ToStringBuilder.java:845)
at
org.apache.celeborn.common.network.protocol.RpcRequest.toString(RpcRequest.java:96)
at
org.apache.celeborn.service.deploy.worker.FetchHandler$$anon$2.operationComplete(FetchHandler.scala:403)
at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at
io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at
io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at
io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:728)
at
io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:283)
at
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242)
at
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
at
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:407)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:782)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
```
This is because in https://github.com/apache/incubator-celeborn/pull/2123
the `release` is called in `TransportRequestHandler#processRpcRequest`, but
`FetchHandler#handleChunkFetchRequest` refererences `req` in callback, which is
later.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test
Closes #2148 from waitinfuture/1162.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/service/deploy/worker/FetchHandler.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index fa163d4f6..e4ea8fd43 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -373,7 +373,8 @@ class FetchHandler(
}
}
- workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ val reqStr = req.toString
+ workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
val fetchTimeMetric =
chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)
val fetchBeginTime = System.nanoTime()
try {
@@ -400,7 +401,7 @@ class FetchHandler(
if (fetchTimeMetric != null) {
fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
}
- workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
}
})
} catch {
@@ -412,7 +413,7 @@ class FetchHandler(
client.getChannel.writeAndFlush(new ChunkFetchFailure(
streamChunkSlice,
Throwables.getStackTraceAsString(e)))
- workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+ workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
}
}