This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.16 by this push:
new 29a74ac02a fix: reference counting (retain/release) in
PerChannelBookieClient (#4293) (#4396)
29a74ac02a is described below
commit 29a74ac02a5a53a9b23e0bcee97b9a710f65b79b
Author: Lari Hotari <[email protected]>
AuthorDate: Mon May 27 14:40:02 2024 +0300
fix: reference counting (retain/release) in PerChannelBookieClient (#4293)
(#4396)
### Motivation
This addresses the remaining gaps of #4289 in handling ByteBuf
retain/release.
This PR will also address the concern about NioBuffer lifecycle brought up
in the review of the original PR review:
https://github.com/apache/bookkeeper/issues/791#issuecomment-383237059 .
This PR fixes several problems:
* ByteString buffer lifecycle in client, follows ByteBufList lifecycle
* ByteBufList lifecycle, moved to write promise
* Calling of write promises in AuthHandler which buffers messages while
authentication is in progress. It was ignoring the promises.
### Changes
- add 2 callback parameters to writeAndFlush:
cleanupActionFailedBeforeWrite and cleanupActionAfterWrite
- use these callback actions for proper cleanup
- extract a utility class ByteStringUtil for wrapping ByteBufList or
ByteBuf as concatenated zero copy ByteString
- properly handle releasing of ByteBufList in the write promise
- properly handle calling promises that are buffered while authentication
is in progress
(cherry picked from commit 0ef2f997ffda66dbc98fedaa21f99f0c4f5cb1be)
# Conflicts:
#
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
---
.../org/apache/bookkeeper/proto/AuthHandler.java | 33 ++++++--
.../apache/bookkeeper/proto/BookieClientImpl.java | 38 +++++----
.../apache/bookkeeper/proto/ByteStringUtil.java | 80 ++++++++++++++++++
.../bookkeeper/proto/PerChannelBookieClient.java | 97 +++++++++++-----------
.../org/apache/bookkeeper/util/ByteBufList.java | 34 +++++---
.../apache/bookkeeper/util/ByteBufListTest.java | 30 +++++--
6 files changed, 222 insertions(+), 90 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 720e7279f5..a11fad102d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -355,7 +355,7 @@ class AuthHandler {
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
- waitingForAuth.add(msg);
+ addMsgAndPromiseToQueue(msg, promise);
}
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
@@ -364,16 +364,26 @@ class AuthHandler {
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
- waitingForAuth.add(msg);
+ addMsgAndPromiseToQueue(msg, promise);
}
} else if (msg instanceof ByteBuf || msg instanceof
ByteBufList) {
- waitingForAuth.add(msg);
+ addMsgAndPromiseToQueue(msg, promise);
} else {
LOG.info("[{}] dropping write of message {}",
ctx.channel(), msg);
}
}
}
+ // Add the message and the associated promise to the queue.
+ // The promise is added to the same queue as the message without an
additional wrapper object so
+ // that object allocations can be avoided. A similar solution is used
in Netty codebase.
+ private void addMsgAndPromiseToQueue(Object msg, ChannelPromise
promise) {
+ waitingForAuth.add(msg);
+ if (promise != null && !promise.isVoid()) {
+ waitingForAuth.add(promise);
+ }
+ }
+
long newTxnId() {
return transactionIdGenerator.incrementAndGet();
}
@@ -433,10 +443,19 @@ class AuthHandler {
if (rc == BKException.Code.OK) {
synchronized (this) {
authenticated = true;
- Object msg = waitingForAuth.poll();
- while (msg != null) {
- NettyChannelUtil.writeAndFlushWithVoidPromise(ctx,
msg);
- msg = waitingForAuth.poll();
+ while (true) {
+ Object msg = waitingForAuth.poll();
+ if (msg == null) {
+ break;
+ }
+ ChannelPromise promise;
+ // check if the message has an associated promise
as the next element in the queue
+ if (waitingForAuth.peek() instanceof
ChannelPromise) {
+ promise = (ChannelPromise)
waitingForAuth.poll();
+ } else {
+ promise = ctx.voidPromise();
+ }
+ ctx.writeAndFlush(msg, promise);
}
}
} else {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index c305a51ea4..5dde52bc3f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -260,18 +260,20 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
toSend.retain();
client.obtain((rc, pcbc) -> {
- if (rc != BKException.Code.OK) {
- try {
- executor.executeOrdered(ledgerId,
- () -> cb.writeLacComplete(rc, ledgerId, addr,
ctx));
- } catch (RejectedExecutionException re) {
-
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId,
addr, ctx);
+ try {
+ if (rc != BKException.Code.OK) {
+ try {
+ executor.executeOrdered(ledgerId,
+ () -> cb.writeLacComplete(rc, ledgerId, addr,
ctx));
+ } catch (RejectedExecutionException re) {
+
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId,
addr, ctx);
+ }
+ } else {
+ pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
}
- } else {
- pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
+ } finally {
+ ReferenceCountUtil.release(toSend);
}
-
- ReferenceCountUtil.release(toSend);
}, ledgerId, useV3Enforced);
}
@@ -392,14 +394,16 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
@Override
public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
- } else {
- pcbc.addEntry(ledgerId, masterKey, entryId,
- toSend, cb, ctx, options, allowFastFail,
writeFlags);
+ try {
+ if (rc != BKException.Code.OK) {
+ bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb,
ctx);
+ } else {
+ pcbc.addEntry(ledgerId, masterKey, entryId,
+ toSend, cb, ctx, options, allowFastFail,
writeFlags);
+ }
+ } finally {
+ ReferenceCountUtil.release(toSend);
}
-
- ReferenceCountUtil.release(toSend);
recycle();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
new file mode 100644
index 0000000000..b26ac7b36a
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import org.apache.bookkeeper.util.ByteBufList;
+
+public class ByteStringUtil {
+
+ /**
+ * Wrap the internal buffers of a ByteBufList into a single ByteString.
+ * The lifecycle of the wrapped ByteString is tied to the ByteBufList.
+ *
+ * @param bufList ByteBufList to wrap
+ * @return ByteString wrapping the internal buffers of the ByteBufList
+ */
+ public static ByteString byteBufListToByteString(ByteBufList bufList) {
+ ByteString aggregated = null;
+ for (int i = 0; i < bufList.size(); i++) {
+ ByteBuf buffer = bufList.getBuffer(i);
+ if (buffer.readableBytes() > 0) {
+ aggregated = byteBufToByteString(aggregated, buffer);
+ }
+ }
+ return aggregated != null ? aggregated : ByteString.EMPTY;
+ }
+
+ /**
+ * Wrap the internal buffers of a ByteBuf into a single ByteString.
+ * The lifecycle of the wrapped ByteString is tied to the ByteBuf.
+ *
+ * @param byteBuf ByteBuf to wrap
+ * @return ByteString wrapping the internal buffers of the ByteBuf
+ */
+ public static ByteString byteBufToByteString(ByteBuf byteBuf) {
+ return byteBufToByteString(null, byteBuf);
+ }
+
+ // internal method to aggregate a ByteBuf into a single aggregated
ByteString
+ private static ByteString byteBufToByteString(ByteString aggregated,
ByteBuf byteBuf) {
+ if (byteBuf.readableBytes() == 0) {
+ return ByteString.EMPTY;
+ }
+ if (byteBuf.nioBufferCount() > 1) {
+ for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) {
+ ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
+ aggregated = (aggregated == null) ? piece :
aggregated.concat(piece);
+ }
+ } else {
+ ByteString piece;
+ if (byteBuf.hasArray()) {
+ piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(),
byteBuf.arrayOffset() + byteBuf.readerIndex(),
+ byteBuf.readableBytes());
+ } else {
+ piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer());
+ }
+ aggregated = (aggregated == null) ? piece :
aggregated.concat(piece);
+ }
+ return aggregated;
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 3a7af5b99a..ca1448c768 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -699,14 +699,10 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.WRITE_LAC)
.setTxnId(txnId);
- ByteString body;
- if (toSend.hasArray()) {
- body = UnsafeByteOperations.unsafeWrap(toSend.array(),
toSend.arrayOffset(), toSend.readableBytes());
- } else if (toSend.size() == 1) {
- body =
UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
- } else {
- body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
- }
+ ByteString body = ByteStringUtil.byteBufListToByteString(toSend);
+ toSend.retain();
+ Runnable cleanupActionFailedBeforeWrite = toSend::release;
+ Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
.setLedgerId(ledgerId)
.setLac(lac)
@@ -717,7 +713,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setHeader(headerBuilder)
.setWriteLacRequest(writeLacBuilder)
.build();
- writeAndFlush(channel, completionKey, writeLacRequest);
+ writeAndFlush(channel, completionKey, writeLacRequest, false,
cleanupActionFailedBeforeWrite,
+ cleanupActionAfterWrite);
}
void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
@@ -776,6 +773,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
Object ctx, final int options, boolean allowFastFail, final
EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
+ Runnable cleanupActionFailedBeforeWrite = null;
+ Runnable cleanupActionAfterWrite = null;
if (useV2WireProtocol) {
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
@@ -785,9 +784,14 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
if (toSend instanceof ByteBuf) {
- request = ((ByteBuf) toSend).retainedDuplicate();
+ ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate();
+ request = byteBuf;
+ cleanupActionFailedBeforeWrite = byteBuf::release;
} else {
- request = ByteBufList.clone((ByteBufList) toSend);
+ ByteBufList byteBufList = (ByteBufList) toSend;
+ byteBufList.retain();
+ request = byteBufList;
+ cleanupActionFailedBeforeWrite = byteBufList::release;
}
} else {
final long txnId = getTxnId();
@@ -802,19 +806,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
}
- ByteString body = null;
ByteBufList bufToSend = (ByteBufList) toSend;
-
- if (bufToSend.hasArray()) {
- body = UnsafeByteOperations.unsafeWrap(bufToSend.array(),
bufToSend.arrayOffset(),
- bufToSend.readableBytes());
- } else {
- for (int i = 0; i < bufToSend.size(); i++) {
- ByteString piece =
UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
- // use ByteString.concat to avoid byte[] allocation when
toSend has multiple ByteBufs
- body = (body == null) ? piece : body.concat(piece);
- }
- }
+ ByteString body =
ByteStringUtil.byteBufListToByteString(bufToSend);
+ bufToSend.retain();
+ cleanupActionFailedBeforeWrite = bufToSend::release;
+ cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
AddRequest.Builder addBuilder = AddRequest.newBuilder()
.setLedgerId(ledgerId)
.setEntryId(entryId)
@@ -839,17 +835,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
putCompletionKeyValue(completionKey,
acquireAddCompletion(completionKey,
cb, ctx, ledgerId,
entryId));
- final Channel c = channel;
- if (c == null) {
- // Manually release the binary data(variable "request") that we
manually created when it can not be sent out
- // because the channel is switching.
- errorOut(completionKey);
- ReferenceCountUtil.release(request);
- return;
- } else {
- // addEntry times out on backpressure
- writeAndFlush(c, completionKey, request, allowFastFail);
- }
+ // addEntry times out on backpressure
+ writeAndFlush(channel, completionKey, request, allowFastFail,
cleanupActionFailedBeforeWrite,
+ cleanupActionAfterWrite);
}
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
@@ -1004,7 +992,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
putCompletionKeyValue(completionKey, readCompletion);
- writeAndFlush(channel, completionKey, request, allowFastFail);
+ writeAndFlush(channel, completionKey, request, allowFastFail, null,
null);
}
public void getBookieInfo(final long requested, GetBookieInfoCallback cb,
Object ctx) {
@@ -1126,17 +1114,20 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private void writeAndFlush(final Channel channel,
final CompletionKey key,
final Object request) {
- writeAndFlush(channel, key, request, false);
+ writeAndFlush(channel, key, request, false, null, null);
}
private void writeAndFlush(final Channel channel,
final CompletionKey key,
final Object request,
- final boolean allowFastFail) {
+ final boolean allowFastFail, final Runnable
cleanupActionFailedBeforeWrite,
+ final Runnable cleanupActionAfterWrite) {
if (channel == null) {
LOG.warn("Operation {} failed: channel == null",
StringUtils.requestToString(request));
errorOut(key);
- ReferenceCountUtil.release(request);
+ if (cleanupActionFailedBeforeWrite != null) {
+ cleanupActionFailedBeforeWrite.run();
+ }
return;
}
@@ -1151,7 +1142,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
StringUtils.requestToString(request));
errorOut(key, BKException.Code.TooManyRequestsException);
- ReferenceCountUtil.release(request);
+ if (cleanupActionFailedBeforeWrite != null) {
+ cleanupActionFailedBeforeWrite.run();
+ }
return;
}
@@ -1159,23 +1152,29 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final long startTime = MathUtils.nowInNano();
ChannelPromise promise = channel.newPromise().addListener(future
-> {
- if (future.isSuccess()) {
-
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
- CompletionValue completion = completionObjects.get(key);
- if (completion != null) {
- completion.setOutstanding();
+ try {
+ if (future.isSuccess()) {
+
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
+ CompletionValue completion =
completionObjects.get(key);
+ if (completion != null) {
+ completion.setOutstanding();
+ }
+ } else {
+
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
+ }
+ } finally {
+ if (cleanupActionAfterWrite != null) {
+ cleanupActionAfterWrite.run();
}
- } else {
-
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
TimeUnit.NANOSECONDS);
}
});
channel.writeAndFlush(request, promise);
} catch (Throwable e) {
LOG.warn("Operation {} failed",
StringUtils.requestToString(request), e);
errorOut(key);
- // If the request goes into the writeAndFlush, it should be
handled well by Netty. So all the exceptions we
- // get here, we can release the request.
- ReferenceCountUtil.release(request);
+ if (cleanupActionFailedBeforeWrite != null) {
+ cleanupActionFailedBeforeWrite.run();
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index f690b99197..1f85a16db8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -286,19 +286,29 @@ public class ByteBufList extends AbstractReferenceCounted
{
if (msg instanceof ByteBufList) {
ByteBufList b = (ByteBufList) msg;
- try {
- // Write each buffer individually on the socket. The
retain() here is needed to preserve the fact
- // that ByteBuf are automatically released after a write.
If the ByteBufPair ref count is increased
- // and it gets written multiple times, the individual
buffers refcount should be reflected as well.
- int buffersCount = b.buffers.size();
- for (int i = 0; i < buffersCount; i++) {
- ByteBuf bx = b.buffers.get(i);
- // Last buffer will carry on the final promise to
notify when everything was written on the
- // socket
- ctx.write(bx.retainedDuplicate(), i == (buffersCount -
1) ? promise : ctx.voidPromise());
+ ChannelPromise compositePromise = ctx.newPromise();
+ compositePromise.addListener(future -> {
+ // release the ByteBufList after the write operation is
completed
+ ReferenceCountUtil.safeRelease(b);
+ // complete the promise passed as an argument unless it's
a void promise
+ if (promise != null && !promise.isVoid()) {
+ if (future.isSuccess()) {
+ promise.setSuccess();
+ } else {
+ promise.setFailure(future.cause());
+ }
}
- } finally {
- ReferenceCountUtil.release(b);
+ });
+
+ // Write each buffer individually on the socket. The retain()
here is needed to preserve the fact
+ // that ByteBuf are automatically released after a write. If
the ByteBufPair ref count is increased
+ // and it gets written multiple times, the individual buffers
refcount should be reflected as well.
+ int buffersCount = b.buffers.size();
+ for (int i = 0; i < buffersCount; i++) {
+ ByteBuf bx = b.buffers.get(i);
+ // Last buffer will carry on the final promise to notify
when everything was written on the
+ // socket
+ ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1)
? compositePromise : ctx.voidPromise());
}
} else {
ctx.write(msg, promise);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
index 88de17d0a9..822d4a7c48 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.util;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -32,6 +34,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.VoidChannelPromise;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
@@ -195,7 +199,8 @@ public class ByteBufListTest {
b2.writerIndex(b2.capacity());
ByteBufList buf = ByteBufList.get(b1, b2);
- ChannelHandlerContext ctx = new MockChannelHandlerContext();
+ Channel channel = mock(Channel.class);
+ MockChannelHandlerContext ctx = new MockChannelHandlerContext(channel);
ByteBufList.ENCODER.write(ctx, buf, null);
@@ -205,6 +210,15 @@ public class ByteBufListTest {
}
class MockChannelHandlerContext implements ChannelHandlerContext {
+ private final Channel channel;
+ private final EventExecutor eventExecutor;
+
+ public MockChannelHandlerContext(Channel channel) {
+ this.channel = channel;
+ eventExecutor = mock(EventExecutor.class);
+ when(eventExecutor.inEventLoop()).thenReturn(true);
+ }
+
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return null;
@@ -274,12 +288,18 @@ public class ByteBufListTest {
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
+ if (promise != null && !promise.isVoid()) {
+ promise.setSuccess();
+ }
return null;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise)
{
ReferenceCountUtil.release(msg);
+ if (promise != null && !promise.isVoid()) {
+ promise.setSuccess();
+ }
return null;
}
@@ -291,7 +311,7 @@ public class ByteBufListTest {
@Override
public ChannelPromise newPromise() {
- return null;
+ return new DefaultChannelPromise(channel, eventExecutor);
}
@Override
@@ -311,17 +331,17 @@ public class ByteBufListTest {
@Override
public ChannelPromise voidPromise() {
- return null;
+ return new VoidChannelPromise(channel, false);
}
@Override
public Channel channel() {
- return null;
+ return channel;
}
@Override
public EventExecutor executor() {
- return null;
+ return eventExecutor;
}
@Override