This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.16 by this push:
     new 29a74ac02a fix: reference counting (retain/release) in 
PerChannelBookieClient (#4293) (#4396)
29a74ac02a is described below

commit 29a74ac02a5a53a9b23e0bcee97b9a710f65b79b
Author: Lari Hotari <[email protected]>
AuthorDate: Mon May 27 14:40:02 2024 +0300

    fix: reference counting (retain/release) in PerChannelBookieClient (#4293) 
(#4396)
    
    ### Motivation
    
    This addresses the remaining gaps of #4289 in handling ByteBuf 
retain/release.
    This PR will also address the concern about NioBuffer lifecycle brought up 
in the review of the original PR review: 
https://github.com/apache/bookkeeper/issues/791#issuecomment-383237059 .
    
    This PR fixes several problems:
    * ByteString buffer lifecycle in client, follows ByteBufList lifecycle
    * ByteBufList lifecycle, moved to write promise
    * Calling of write promises in AuthHandler which buffers messages while 
authentication is in progress. It was ignoring the promises.
    
    ### Changes
    
    - add 2 callback parameters to writeAndFlush: 
cleanupActionFailedBeforeWrite and cleanupActionAfterWrite
      - use these callback actions for proper cleanup
    - extract a utility class ByteStringUtil for wrapping ByteBufList or 
ByteBuf as concatenated zero copy ByteString
    - properly handle releasing of ByteBufList in the write promise
    - properly handle calling promises that are buffered while authentication 
is in progress
    
    (cherry picked from commit 0ef2f997ffda66dbc98fedaa21f99f0c4f5cb1be)
    
    # Conflicts:
    #       
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
---
 .../org/apache/bookkeeper/proto/AuthHandler.java   | 33 ++++++--
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 38 +++++----
 .../apache/bookkeeper/proto/ByteStringUtil.java    | 80 ++++++++++++++++++
 .../bookkeeper/proto/PerChannelBookieClient.java   | 97 +++++++++++-----------
 .../org/apache/bookkeeper/util/ByteBufList.java    | 34 +++++---
 .../apache/bookkeeper/util/ByteBufListTest.java    | 30 +++++--
 6 files changed, 222 insertions(+), 90 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 720e7279f5..a11fad102d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -355,7 +355,7 @@ class AuthHandler {
                         super.write(ctx, msg, promise);
                         super.flush(ctx);
                     } else {
-                        waitingForAuth.add(msg);
+                        addMsgAndPromiseToQueue(msg, promise);
                     }
                 } else if (msg instanceof BookieProtocol.Request) {
                     // let auth messages through, queue the rest
@@ -364,16 +364,26 @@ class AuthHandler {
                         super.write(ctx, msg, promise);
                         super.flush(ctx);
                     } else {
-                        waitingForAuth.add(msg);
+                        addMsgAndPromiseToQueue(msg, promise);
                     }
                 } else if (msg instanceof ByteBuf || msg instanceof 
ByteBufList) {
-                    waitingForAuth.add(msg);
+                    addMsgAndPromiseToQueue(msg, promise);
                 } else {
                     LOG.info("[{}] dropping write of message {}", 
ctx.channel(), msg);
                 }
             }
         }
 
+        // Add the message and the associated promise to the queue.
+        // The promise is added to the same queue as the message without an 
additional wrapper object so
+        // that object allocations can be avoided. A similar solution is used 
in Netty codebase.
+        private void addMsgAndPromiseToQueue(Object msg, ChannelPromise 
promise) {
+            waitingForAuth.add(msg);
+            if (promise != null && !promise.isVoid()) {
+                waitingForAuth.add(promise);
+            }
+        }
+
         long newTxnId() {
             return transactionIdGenerator.incrementAndGet();
         }
@@ -433,10 +443,19 @@ class AuthHandler {
                 if (rc == BKException.Code.OK) {
                     synchronized (this) {
                         authenticated = true;
-                        Object msg = waitingForAuth.poll();
-                        while (msg != null) {
-                            NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, 
msg);
-                            msg = waitingForAuth.poll();
+                        while (true) {
+                            Object msg = waitingForAuth.poll();
+                            if (msg == null) {
+                                break;
+                            }
+                            ChannelPromise promise;
+                            // check if the message has an associated promise 
as the next element in the queue
+                            if (waitingForAuth.peek() instanceof 
ChannelPromise) {
+                                promise = (ChannelPromise) 
waitingForAuth.poll();
+                            } else {
+                                promise = ctx.voidPromise();
+                            }
+                            ctx.writeAndFlush(msg, promise);
                         }
                     }
                 } else {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index c305a51ea4..5dde52bc3f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -260,18 +260,20 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
 
         toSend.retain();
         client.obtain((rc, pcbc) -> {
-            if (rc != BKException.Code.OK) {
-                try {
-                    executor.executeOrdered(ledgerId,
-                            () -> cb.writeLacComplete(rc, ledgerId, addr, 
ctx));
-                } catch (RejectedExecutionException re) {
-                    
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, 
addr, ctx);
+            try {
+                if (rc != BKException.Code.OK) {
+                    try {
+                        executor.executeOrdered(ledgerId,
+                                () -> cb.writeLacComplete(rc, ledgerId, addr, 
ctx));
+                    } catch (RejectedExecutionException re) {
+                        
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, 
addr, ctx);
+                    }
+                } else {
+                    pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
                 }
-            } else {
-                pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+            } finally {
+                ReferenceCountUtil.release(toSend);
             }
-
-            ReferenceCountUtil.release(toSend);
         }, ledgerId, useV3Enforced);
     }
 
@@ -392,14 +394,16 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         @Override
         public void operationComplete(final int rc,
                                       PerChannelBookieClient pcbc) {
-            if (rc != BKException.Code.OK) {
-                bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
-            } else {
-                pcbc.addEntry(ledgerId, masterKey, entryId,
-                              toSend, cb, ctx, options, allowFastFail, 
writeFlags);
+            try {
+                if (rc != BKException.Code.OK) {
+                    bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, 
ctx);
+                } else {
+                    pcbc.addEntry(ledgerId, masterKey, entryId,
+                            toSend, cb, ctx, options, allowFastFail, 
writeFlags);
+                }
+            } finally {
+                ReferenceCountUtil.release(toSend);
             }
-
-            ReferenceCountUtil.release(toSend);
             recycle();
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
new file mode 100644
index 0000000000..b26ac7b36a
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import org.apache.bookkeeper.util.ByteBufList;
+
+public class ByteStringUtil {
+
+    /**
+     * Wrap the internal buffers of a ByteBufList into a single ByteString.
+     * The lifecycle of the wrapped ByteString is tied to the ByteBufList.
+     *
+     * @param bufList ByteBufList to wrap
+     * @return ByteString wrapping the internal buffers of the ByteBufList
+     */
+    public static ByteString byteBufListToByteString(ByteBufList bufList) {
+        ByteString aggregated = null;
+        for (int i = 0; i < bufList.size(); i++) {
+            ByteBuf buffer = bufList.getBuffer(i);
+            if (buffer.readableBytes() > 0) {
+                aggregated = byteBufToByteString(aggregated, buffer);
+            }
+        }
+        return aggregated != null ? aggregated : ByteString.EMPTY;
+    }
+
+    /**
+     * Wrap the internal buffers of a ByteBuf into a single ByteString.
+     * The lifecycle of the wrapped ByteString is tied to the ByteBuf.
+     *
+     * @param byteBuf ByteBuf to wrap
+     * @return ByteString wrapping the internal buffers of the ByteBuf
+     */
+    public static ByteString byteBufToByteString(ByteBuf byteBuf) {
+        return byteBufToByteString(null, byteBuf);
+    }
+
+    // internal method to aggregate a ByteBuf into a single aggregated 
ByteString
+    private static ByteString byteBufToByteString(ByteString aggregated, 
ByteBuf byteBuf) {
+        if (byteBuf.readableBytes() == 0) {
+            return ByteString.EMPTY;
+        }
+        if (byteBuf.nioBufferCount() > 1) {
+            for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) {
+                ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
+                aggregated = (aggregated == null) ? piece : 
aggregated.concat(piece);
+            }
+        } else {
+            ByteString piece;
+            if (byteBuf.hasArray()) {
+                piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), 
byteBuf.arrayOffset() + byteBuf.readerIndex(),
+                        byteBuf.readableBytes());
+            } else {
+                piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer());
+            }
+            aggregated = (aggregated == null) ? piece : 
aggregated.concat(piece);
+        }
+        return aggregated;
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 3a7af5b99a..ca1448c768 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -699,14 +699,10 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 .setVersion(ProtocolVersion.VERSION_THREE)
                 .setOperation(OperationType.WRITE_LAC)
                 .setTxnId(txnId);
-        ByteString body;
-        if (toSend.hasArray()) {
-            body = UnsafeByteOperations.unsafeWrap(toSend.array(), 
toSend.arrayOffset(), toSend.readableBytes());
-        } else if (toSend.size() == 1) {
-            body = 
UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
-        } else {
-            body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
-        }
+        ByteString body = ByteStringUtil.byteBufListToByteString(toSend);
+        toSend.retain();
+        Runnable cleanupActionFailedBeforeWrite = toSend::release;
+        Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
         WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
                 .setLedgerId(ledgerId)
                 .setLac(lac)
@@ -717,7 +713,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 .setHeader(headerBuilder)
                 .setWriteLacRequest(writeLacBuilder)
                 .build();
-        writeAndFlush(channel, completionKey, writeLacRequest);
+        writeAndFlush(channel, completionKey, writeLacRequest, false, 
cleanupActionFailedBeforeWrite,
+                cleanupActionAfterWrite);
     }
 
     void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
@@ -776,6 +773,8 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                   Object ctx, final int options, boolean allowFastFail, final 
EnumSet<WriteFlag> writeFlags) {
         Object request = null;
         CompletionKey completionKey = null;
+        Runnable cleanupActionFailedBeforeWrite = null;
+        Runnable cleanupActionAfterWrite = null;
         if (useV2WireProtocol) {
             if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
                 LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
@@ -785,9 +784,14 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.ADD_ENTRY);
 
             if (toSend instanceof ByteBuf) {
-                request = ((ByteBuf) toSend).retainedDuplicate();
+                ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate();
+                request = byteBuf;
+                cleanupActionFailedBeforeWrite = byteBuf::release;
             } else {
-                request = ByteBufList.clone((ByteBufList) toSend);
+                ByteBufList byteBufList = (ByteBufList) toSend;
+                byteBufList.retain();
+                request = byteBufList;
+                cleanupActionFailedBeforeWrite = byteBufList::release;
             }
         } else {
             final long txnId = getTxnId();
@@ -802,19 +806,11 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
             }
 
-            ByteString body = null;
             ByteBufList bufToSend = (ByteBufList) toSend;
-
-            if (bufToSend.hasArray()) {
-                body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), 
bufToSend.arrayOffset(),
-                        bufToSend.readableBytes());
-            } else {
-                for (int i = 0; i < bufToSend.size(); i++) {
-                    ByteString piece = 
UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
-                    // use ByteString.concat to avoid byte[] allocation when 
toSend has multiple ByteBufs
-                    body = (body == null) ? piece : body.concat(piece);
-                }
-            }
+            ByteString body = 
ByteStringUtil.byteBufListToByteString(bufToSend);
+            bufToSend.retain();
+            cleanupActionFailedBeforeWrite = bufToSend::release;
+            cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId)
@@ -839,17 +835,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         putCompletionKeyValue(completionKey,
                               acquireAddCompletion(completionKey,
                                                    cb, ctx, ledgerId, 
entryId));
-        final Channel c = channel;
-        if (c == null) {
-            // Manually release the binary data(variable "request") that we 
manually created when it can not be sent out
-            // because the channel is switching.
-            errorOut(completionKey);
-            ReferenceCountUtil.release(request);
-            return;
-        } else {
-            // addEntry times out on backpressure
-            writeAndFlush(c, completionKey, request, allowFastFail);
-        }
+        // addEntry times out on backpressure
+        writeAndFlush(channel, completionKey, request, allowFastFail, 
cleanupActionFailedBeforeWrite,
+                cleanupActionAfterWrite);
     }
 
     public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
@@ -1004,7 +992,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, 
ctx, ledgerId, entryId);
         putCompletionKeyValue(completionKey, readCompletion);
 
-        writeAndFlush(channel, completionKey, request, allowFastFail);
+        writeAndFlush(channel, completionKey, request, allowFastFail, null, 
null);
     }
 
     public void getBookieInfo(final long requested, GetBookieInfoCallback cb, 
Object ctx) {
@@ -1126,17 +1114,20 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     private void writeAndFlush(final Channel channel,
                                final CompletionKey key,
                                final Object request) {
-        writeAndFlush(channel, key, request, false);
+        writeAndFlush(channel, key, request, false, null, null);
     }
 
     private void writeAndFlush(final Channel channel,
                            final CompletionKey key,
                            final Object request,
-                           final boolean allowFastFail) {
+                               final boolean allowFastFail, final Runnable 
cleanupActionFailedBeforeWrite,
+                               final Runnable cleanupActionAfterWrite) {
         if (channel == null) {
             LOG.warn("Operation {} failed: channel == null", 
StringUtils.requestToString(request));
             errorOut(key);
-            ReferenceCountUtil.release(request);
+            if (cleanupActionFailedBeforeWrite != null) {
+                cleanupActionFailedBeforeWrite.run();
+            }
             return;
         }
 
@@ -1151,7 +1142,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                     StringUtils.requestToString(request));
 
             errorOut(key, BKException.Code.TooManyRequestsException);
-            ReferenceCountUtil.release(request);
+            if (cleanupActionFailedBeforeWrite != null) {
+                cleanupActionFailedBeforeWrite.run();
+            }
             return;
         }
 
@@ -1159,23 +1152,29 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             final long startTime = MathUtils.nowInNano();
 
             ChannelPromise promise = channel.newPromise().addListener(future 
-> {
-                if (future.isSuccess()) {
-                    
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
-                    CompletionValue completion = completionObjects.get(key);
-                    if (completion != null) {
-                        completion.setOutstanding();
+                try {
+                    if (future.isSuccess()) {
+                        
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
+                        CompletionValue completion = 
completionObjects.get(key);
+                        if (completion != null) {
+                            completion.setOutstanding();
+                        }
+                    } else {
+                        
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
+                    }
+                } finally {
+                    if (cleanupActionAfterWrite != null) {
+                        cleanupActionAfterWrite.run();
                     }
-                } else {
-                    
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), 
TimeUnit.NANOSECONDS);
                 }
             });
             channel.writeAndFlush(request, promise);
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", 
StringUtils.requestToString(request), e);
             errorOut(key);
-            // If the request goes into the writeAndFlush, it should be 
handled well by Netty. So all the exceptions we
-            // get here, we can release the request.
-            ReferenceCountUtil.release(request);
+            if (cleanupActionFailedBeforeWrite != null) {
+                cleanupActionFailedBeforeWrite.run();
+            }
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index f690b99197..1f85a16db8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -286,19 +286,29 @@ public class ByteBufList extends AbstractReferenceCounted 
{
             if (msg instanceof ByteBufList) {
                 ByteBufList b = (ByteBufList) msg;
 
-                try {
-                    // Write each buffer individually on the socket. The 
retain() here is needed to preserve the fact
-                    // that ByteBuf are automatically released after a write. 
If the ByteBufPair ref count is increased
-                    // and it gets written multiple times, the individual 
buffers refcount should be reflected as well.
-                    int buffersCount = b.buffers.size();
-                    for (int i = 0; i < buffersCount; i++) {
-                        ByteBuf bx = b.buffers.get(i);
-                        // Last buffer will carry on the final promise to 
notify when everything was written on the
-                        // socket
-                        ctx.write(bx.retainedDuplicate(), i == (buffersCount - 
1) ? promise : ctx.voidPromise());
+                ChannelPromise compositePromise = ctx.newPromise();
+                compositePromise.addListener(future -> {
+                    // release the ByteBufList after the write operation is 
completed
+                    ReferenceCountUtil.safeRelease(b);
+                    // complete the promise passed as an argument unless it's 
a void promise
+                    if (promise != null && !promise.isVoid()) {
+                        if (future.isSuccess()) {
+                            promise.setSuccess();
+                        } else {
+                            promise.setFailure(future.cause());
+                        }
                     }
-                } finally {
-                    ReferenceCountUtil.release(b);
+                });
+
+                // Write each buffer individually on the socket. The retain() 
here is needed to preserve the fact
+                // that ByteBuf are automatically released after a write. If 
the ByteBufPair ref count is increased
+                // and it gets written multiple times, the individual buffers 
refcount should be reflected as well.
+                int buffersCount = b.buffers.size();
+                for (int i = 0; i < buffersCount; i++) {
+                    ByteBuf bx = b.buffers.get(i);
+                    // Last buffer will carry on the final promise to notify 
when everything was written on the
+                    // socket
+                    ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1) 
? compositePromise : ctx.voidPromise());
                 }
             } else {
                 ctx.write(msg, promise);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
index 88de17d0a9..822d4a7c48 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.util;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -32,6 +34,8 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelProgressivePromise;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.VoidChannelPromise;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
@@ -195,7 +199,8 @@ public class ByteBufListTest {
         b2.writerIndex(b2.capacity());
         ByteBufList buf = ByteBufList.get(b1, b2);
 
-        ChannelHandlerContext ctx = new MockChannelHandlerContext();
+        Channel channel = mock(Channel.class);
+        MockChannelHandlerContext ctx = new MockChannelHandlerContext(channel);
 
         ByteBufList.ENCODER.write(ctx, buf, null);
 
@@ -205,6 +210,15 @@ public class ByteBufListTest {
     }
 
     class MockChannelHandlerContext implements ChannelHandlerContext {
+        private final Channel channel;
+        private final EventExecutor eventExecutor;
+
+        public MockChannelHandlerContext(Channel channel) {
+            this.channel = channel;
+            eventExecutor = mock(EventExecutor.class);
+            when(eventExecutor.inEventLoop()).thenReturn(true);
+        }
+
         @Override
         public ChannelFuture bind(SocketAddress localAddress) {
             return null;
@@ -274,12 +288,18 @@ public class ByteBufListTest {
         @Override
         public ChannelFuture write(Object msg, ChannelPromise promise) {
             ReferenceCountUtil.release(msg);
+            if (promise != null  && !promise.isVoid()) {
+                promise.setSuccess();
+            }
             return null;
         }
 
         @Override
         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) 
{
             ReferenceCountUtil.release(msg);
+            if (promise != null && !promise.isVoid()) {
+                promise.setSuccess();
+            }
             return null;
         }
 
@@ -291,7 +311,7 @@ public class ByteBufListTest {
 
         @Override
         public ChannelPromise newPromise() {
-            return null;
+            return new DefaultChannelPromise(channel, eventExecutor);
         }
 
         @Override
@@ -311,17 +331,17 @@ public class ByteBufListTest {
 
         @Override
         public ChannelPromise voidPromise() {
-            return null;
+            return new VoidChannelPromise(channel, false);
         }
 
         @Override
         public Channel channel() {
-            return null;
+            return channel;
         }
 
         @Override
         public EventExecutor executor() {
-            return null;
+            return eventExecutor;
         }
 
         @Override

Reply via email to