This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 3a95424ed0d9138d2b4c318c4ce2999a70891f1a
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Oct 18 14:21:54 2022 +0800

    Fix memory leak when reading entry but the connection disconnected. (#3528)
    
    * Fix direct memory leak problem
    
    (cherry picked from commit 30bdedc25a59aa7d4df3f5c0962095a574f0d653)
---
 .../apache/bookkeeper/proto/BookieProtocol.java    | 43 +++++++++++++++++-----
 .../bookkeeper/proto/PacketProcessorBase.java      | 27 +++++++++-----
 .../bookkeeper/proto/WriteEntryProcessorTest.java  | 20 +++++++---
 3 files changed, 65 insertions(+), 25 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index cc290b0673..965f3fd1bb 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -25,7 +25,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
-
+import io.netty.util.ReferenceCounted;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
 import org.apache.bookkeeper.util.ByteBufList;
 
@@ -433,10 +433,8 @@ public interface BookieProtocol {
                                  opCode, ledgerId, entryId, errorCode);
         }
 
-        void retain() {
-        }
-
-        void release() {
+        boolean release() {
+            return true;
         }
 
         void recycle() {
@@ -446,7 +444,7 @@ public interface BookieProtocol {
     /**
      * A request that reads data.
      */
-    class ReadResponse extends Response {
+    class ReadResponse extends Response implements ReferenceCounted {
         final ByteBuf data;
 
         ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long 
entryId) {
@@ -467,13 +465,38 @@ public interface BookieProtocol {
         }
 
         @Override
-        public void retain() {
-            data.retain();
+        public int refCnt() {
+            return data.refCnt();
         }
 
         @Override
-        public void release() {
-            data.release();
+        public ReferenceCounted retain() {
+            return data.retain();
+        }
+
+        @Override
+        public ReferenceCounted retain(int increment) {
+            return data.retain(increment);
+        }
+
+        @Override
+        public ReferenceCounted touch() {
+            return data.touch();
+        }
+
+        @Override
+        public ReferenceCounted touch(Object hint) {
+            return data.touch(hint);
+        }
+
+        @Override
+        public boolean release() {
+            return data.release();
+        }
+
+        @Override
+        public boolean release(int decrement) {
+            return data.release(decrement);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index c1bf977956..0fb2d3f8f0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -19,6 +19,8 @@ package org.apache.bookkeeper.proto;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -104,13 +106,13 @@ abstract class PacketProcessorBase<T extends Request> 
extends SafeRunnable {
             }
 
             if (!channel.isWritable()) {
-                LOGGER.warn("cannot write response to non-writable channel {} 
for request {}", channel,
+                logger.warn("cannot write response to non-writable channel {} 
for request {}", channel,
                     StringUtils.requestToString(request));
                 requestProcessor.getRequestStats().getChannelWriteStats()
                     .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), 
TimeUnit.NANOSECONDS);
                 
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
-                if (response instanceof BookieProtocol.ReadResponse) {
-                    ((BookieProtocol.ReadResponse) response).release();
+                if (response instanceof BookieProtocol.Response) {
+                    ((BookieProtocol.Response) response).release();
                 }
                 return;
             } else {
@@ -119,12 +121,17 @@ abstract class PacketProcessorBase<T extends Request> 
extends SafeRunnable {
         }
 
         if (channel.isActive()) {
-            channel.writeAndFlush(response, channel.voidPromise());
+            ChannelPromise promise = channel.newPromise().addListener(future 
-> {
+                if (!future.isSuccess()) {
+                    logger.debug("Netty channel write exception. ", 
future.cause());
+                }
+            });
+            channel.writeAndFlush(response, promise);
         } else {
-            if (response instanceof BookieProtocol.ReadResponse) {
-                ((BookieProtocol.ReadResponse) response).release();
+            if (response instanceof BookieProtocol.Response) {
+                ((BookieProtocol.Response) response).release();
             }
-            LOGGER.debug("Netty channel {} is inactive, "
+            logger.debug("Netty channel {} is inactive, "
                     + "hence bypassing netty channel writeAndFlush during 
sendResponse", channel);
         }
         if (BookieProtocol.EOK == rc) {
@@ -145,12 +152,12 @@ abstract class PacketProcessorBase<T extends Request> 
extends SafeRunnable {
         try {
             ChannelFuture future = channel.writeAndFlush(response);
             if (!channel.eventLoop().inEventLoop()) {
-                future.await();
+                future.get();
             }
-        } catch (InterruptedException e) {
+        } catch (ExecutionException | InterruptedException e) {
+            logger.debug("Netty channel write exception. ", e);
             return;
         }
-
         if (BookieProtocol.EOK == rc) {
             
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
         } else {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index 150e0c3089..3ed939b796 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -101,7 +101,9 @@ public class WriteEntryProcessorTest {
     @Test
     public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
         when(bookie.isReadOnly()).thenReturn(true);
-        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        ChannelPromise mockPromise = mock(ChannelPromise.class);
+        when(channel.newPromise()).thenReturn(mockPromise);
+        when(mockPromise.addListener(any())).thenReturn(mockPromise);
 
         AtomicReference<Object> writtenObject = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
@@ -131,7 +133,9 @@ public class WriteEntryProcessorTest {
 
         when(bookie.isReadOnly()).thenReturn(true);
         when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
-        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        ChannelPromise mockPromise = mock(ChannelPromise.class);
+        when(channel.newPromise()).thenReturn(mockPromise);
+        when(mockPromise.addListener(any())).thenReturn(mockPromise);
 
         AtomicReference<Object> writtenObject = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
@@ -161,7 +165,9 @@ public class WriteEntryProcessorTest {
 
         when(bookie.isReadOnly()).thenReturn(true);
         when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
-        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        ChannelPromise mockPromise = mock(ChannelPromise.class);
+        when(channel.newPromise()).thenReturn(mockPromise);
+        when(mockPromise.addListener(any())).thenReturn(mockPromise);
         doAnswer(invocationOnMock -> {
             processor.writeComplete(0, request.ledgerId, request.entryId, 
null, null);
             return null;
@@ -194,7 +200,9 @@ public class WriteEntryProcessorTest {
     @Test
     public void testNormalWritesOnWritableBookie() throws Exception {
         when(bookie.isReadOnly()).thenReturn(false);
-        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        ChannelPromise mockPromise = mock(ChannelPromise.class);
+        when(channel.newPromise()).thenReturn(mockPromise);
+        when(mockPromise.addListener(any())).thenReturn(mockPromise);
         doAnswer(invocationOnMock -> {
             processor.writeComplete(0, request.ledgerId, request.entryId, 
null, null);
             return null;
@@ -227,7 +235,9 @@ public class WriteEntryProcessorTest {
     @Test
     public void testWritesCacheFlushTimeout() throws Exception {
         when(bookie.isReadOnly()).thenReturn(false);
-        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        ChannelPromise mockPromise = mock(ChannelPromise.class);
+        when(channel.newPromise()).thenReturn(mockPromise);
+        when(mockPromise.addListener(any())).thenReturn(mockPromise);
         
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
         doAnswer(invocationOnMock -> {
             throw new BookieException.OperationRejectedException();

Reply via email to