This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b86b58f9 [CELEBORN-1162][BUG] Fix refCnt 0 Exception in 
FetchHandler#handleChunkFetchRequest
6b86b58f9 is described below

commit 6b86b58f9e216fbf4bb53150bc76effc985c4a1d
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Dec 12 16:40:08 2023 +0800

    [CELEBORN-1162][BUG] Fix refCnt 0 Exception in 
FetchHandler#handleChunkFetchRequest
    
    ### What changes were proposed in this pull request?
    When I'm testing main branch I encountered exception below:
    ```
    23/12/12 16:03:03,262 WARN [fetch-server-11-52] DefaultPromise: An 
exception was thrown by 
org.apache.celeborn.service.deploy.worker.FetchHandler$$anon$2.operationComplete()
    io.netty.util.IllegalReferenceCountException: refCnt: 0
            at 
io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
            at 
io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383)
            at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:433)
            at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:208)
            at 
io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1342)
            at java.util.WeakHashMap.hash(WeakHashMap.java:298)
            at java.util.WeakHashMap.getEntry(WeakHashMap.java:426)
            at java.util.WeakHashMap.containsKey(WeakHashMap.java:417)
            at 
org.apache.commons.lang3.builder.ToStringStyle.isRegistered(ToStringStyle.java:207)
            at 
org.apache.commons.lang3.builder.ToStringStyle.appendInternal(ToStringStyle.java:492)
            at 
org.apache.commons.lang3.builder.ToStringStyle.append(ToStringStyle.java:466)
            at 
org.apache.commons.lang3.builder.ToStringBuilder.append(ToStringBuilder.java:845)
            at 
org.apache.celeborn.common.network.buffer.NettyManagedBuffer.toString(NettyManagedBuffer.java:82)
            at java.lang.String.valueOf(String.java:2994)
            at java.lang.StringBuffer.append(StringBuffer.java:269)
            at 
org.apache.commons.lang3.builder.ToStringStyle.appendDetail(ToStringStyle.java:614)
            at 
org.apache.commons.lang3.builder.ToStringStyle.appendInternal(ToStringStyle.java:579)
            at 
org.apache.commons.lang3.builder.ToStringStyle.append(ToStringStyle.java:466)
            at 
org.apache.commons.lang3.builder.ToStringBuilder.append(ToStringBuilder.java:845)
            at 
org.apache.celeborn.common.network.protocol.RpcRequest.toString(RpcRequest.java:96)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler$$anon$2.operationComplete(FetchHandler.scala:403)
            at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
            at 
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
            at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
            at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
            at 
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
            at 
io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
            at 
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
            at 
io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
            at 
io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:728)
            at 
io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:283)
            at 
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242)
            at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
            at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:407)
            at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
            at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:782)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    This is because in https://github.com/apache/incubator-celeborn/pull/2123 
the `release` is called in `TransportRequestHandler#processRpcRequest`, but 
`FetchHandler#handleChunkFetchRequest` refererences `req` in callback, which is 
later.
    
    ### Why are the changes needed?
    ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test
    
    Closes #2148 from waitinfuture/1162.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index fa163d4f6..e4ea8fd43 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -373,7 +373,8 @@ class FetchHandler(
       }
     }
 
-    workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+    val reqStr = req.toString
+    workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
     val fetchTimeMetric = 
chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)
     val fetchBeginTime = System.nanoTime()
     try {
@@ -400,7 +401,7 @@ class FetchHandler(
             if (fetchTimeMetric != null) {
               fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
             }
-            workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+            workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
           }
         })
     } catch {
@@ -412,7 +413,7 @@ class FetchHandler(
         client.getChannel.writeAndFlush(new ChunkFetchFailure(
           streamChunkSlice,
           Throwables.getStackTraceAsString(e)))
-        workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString)
+        workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
     }
   }
 

Reply via email to