This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new dfde3d6836 Pass BookieRequestHandler instead of Channel to the request processors (#3835) dfde3d6836 is described below commit dfde3d6836094cc39d8a3603cb268ca0f633350b Author: Matteo Merli <mme...@apache.org> AuthorDate: Sun Mar 5 09:43:37 2023 -0800 Pass BookieRequestHandler instead of Channel to the request processors (#3835) * Pass BookieRequestHandler instead of Channel to the request processors * Fixed checkstyle * Fixed ForceLedgerProcessorV3Test * Fixed TestBookieRequestProcessor * Fixed line length --- .../bookkeeper/processor/RequestProcessor.java | 4 +- .../bookkeeper/proto/BookieRequestHandler.java | 11 ++- .../bookkeeper/proto/BookieRequestProcessor.java | 94 ++++++++++++---------- .../bookkeeper/proto/ForceLedgerProcessorV3.java | 7 +- .../bookkeeper/proto/GetBookieInfoProcessorV3.java | 5 +- .../proto/GetListOfEntriesOfLedgerProcessorV3.java | 5 +- .../proto/LongPollReadEntryProcessorV3.java | 5 +- .../bookkeeper/proto/PacketProcessorBase.java | 28 ++++--- .../bookkeeper/proto/PacketProcessorBaseV3.java | 7 +- .../bookkeeper/proto/ReadEntryProcessor.java | 14 ++-- .../bookkeeper/proto/ReadEntryProcessorV3.java | 11 +-- .../bookkeeper/proto/ReadLacProcessorV3.java | 5 +- .../bookkeeper/proto/WriteEntryProcessor.java | 12 +-- .../bookkeeper/proto/WriteEntryProcessorV3.java | 13 +-- .../bookkeeper/proto/WriteLacProcessorV3.java | 8 +- .../proto/ForceLedgerProcessorV3Test.java | 19 ++++- .../proto/GetBookieInfoProcessorV3Test.java | 12 ++- .../proto/LongPollReadEntryProcessorV3Test.java | 9 ++- .../bookkeeper/proto/ReadEntryProcessorTest.java | 15 +++- .../proto/TestBookieRequestProcessor.java | 14 +++- .../bookkeeper/proto/WriteEntryProcessorTest.java | 45 ++++++----- .../proto/WriteEntryProcessorV3Test.java | 13 ++- 22 files changed, 223 insertions(+), 133 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index 50b97e906b..5a4238e64d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -20,7 +20,7 @@ */ package org.apache.bookkeeper.processor; -import io.netty.channel.Channel; +import org.apache.bookkeeper.proto.BookieRequestHandler; /** * A request processor that is used for processing requests at bookie side. @@ -41,5 +41,5 @@ public interface RequestProcessor extends AutoCloseable { * @param channel * channel received the given request <i>r</i> */ - void processRequest(Object r, Channel channel); + void processRequest(Object r, BookieRequestHandler channel); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 93be69cd37..c9d65a7317 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -32,20 +32,27 @@ import org.slf4j.LoggerFactory; /** * Serverside handler for bookkeeper requests. */ -class BookieRequestHandler extends ChannelInboundHandlerAdapter { +public class BookieRequestHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class); private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; + private ChannelHandlerContext ctx; + BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; } + public ChannelHandlerContext ctx() { + return ctx; + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOG.info("Channel connected {}", ctx.channel()); + this.ctx = ctx; super.channelActive(ctx); } @@ -75,6 +82,6 @@ class BookieRequestHandler extends ChannelInboundHandlerAdapter { ctx.fireChannelRead(msg); return; } - requestProcessor.processRequest(msg, ctx.channel()); + requestProcessor.processRequest(msg, this); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index f7f4eceda3..9237c451ed 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -299,7 +299,8 @@ public class BookieRequestProcessor implements RequestProcessor { } @Override - public void processRequest(Object msg, Channel c) { + public void processRequest(Object msg, BookieRequestHandler requestHandler) { + Channel channel = requestHandler.ctx().channel(); // If we can decode this packet as a Request protobuf packet, process // it as a version 3 packet. Else, just use the old protocol. if (msg instanceof BookkeeperProtocol.Request) { @@ -309,16 +310,16 @@ public class BookieRequestProcessor implements RequestProcessor { BookkeeperProtocol.BKPacketHeader header = r.getHeader(); switch (header.getOperation()) { case ADD_ENTRY: - processAddRequestV3(r, c); + processAddRequestV3(r, requestHandler); break; case READ_ENTRY: - processReadRequestV3(r, c); + processReadRequestV3(r, requestHandler); break; case FORCE_LEDGER: - processForceLedgerRequestV3(r, c); + processForceLedgerRequestV3(r, requestHandler); break; case AUTH: - LOG.info("Ignoring auth operation from client {}", c.remoteAddress()); + LOG.info("Ignoring auth operation from client {}", channel.remoteAddress()); BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage .newBuilder() .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME) @@ -328,29 +329,29 @@ public class BookieRequestProcessor implements RequestProcessor { .newBuilder().setHeader(r.getHeader()) .setStatus(BookkeeperProtocol.StatusCode.EOK) .setAuthResponse(message); - c.writeAndFlush(authResponse.build()); + channel.writeAndFlush(authResponse.build()); break; case WRITE_LAC: - processWriteLacRequestV3(r, c); + processWriteLacRequestV3(r, requestHandler); break; case READ_LAC: - processReadLacRequestV3(r, c); + processReadLacRequestV3(r, requestHandler); break; case GET_BOOKIE_INFO: - processGetBookieInfoRequestV3(r, c); + processGetBookieInfoRequestV3(r, requestHandler); break; case START_TLS: - processStartTLSRequestV3(r, c); + processStartTLSRequestV3(r, requestHandler); break; case GET_LIST_OF_ENTRIES_OF_LEDGER: - processGetListOfEntriesOfLedgerProcessorV3(r, c); + processGetListOfEntriesOfLedgerProcessorV3(r, requestHandler); break; default: LOG.info("Unknown operation type {}", header.getOperation()); BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader()) .setStatus(BookkeeperProtocol.StatusCode.EBADREQ); - c.writeAndFlush(response.build()); + channel.writeAndFlush(response.build()); if (statsEnabled) { bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); } @@ -365,26 +366,27 @@ public class BookieRequestProcessor implements RequestProcessor { switch (r.getOpCode()) { case BookieProtocol.ADDENTRY: checkArgument(r instanceof BookieProtocol.ParsedAddRequest); - processAddRequest((BookieProtocol.ParsedAddRequest) r, c); + processAddRequest((BookieProtocol.ParsedAddRequest) r, requestHandler); break; case BookieProtocol.READENTRY: checkArgument(r instanceof BookieProtocol.ReadRequest); - processReadRequest((BookieProtocol.ReadRequest) r, c); + processReadRequest((BookieProtocol.ReadRequest) r, requestHandler); break; case BookieProtocol.AUTH: - LOG.info("Ignoring auth operation from client {}", c.remoteAddress()); + LOG.info("Ignoring auth operation from client {}", + requestHandler.ctx().channel().remoteAddress()); BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage .newBuilder() .setAuthPluginName(AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME) .setPayload(ByteString.copyFrom(AuthToken.NULL.getData())) .build(); - c.writeAndFlush(new BookieProtocol.AuthResponse( + channel.writeAndFlush(new BookieProtocol.AuthResponse( BookieProtocol.CURRENT_PROTOCOL_VERSION, message)); break; default: LOG.error("Unknown op type {}, sending error", r.getOpCode()); - c.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); + channel.writeAndFlush(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r)); if (statsEnabled) { bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps(); } @@ -402,8 +404,9 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) { - WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, c, this); + private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, + final BookieRequestHandler requestHandler) { + WriteLacProcessorV3 writeLac = new WriteLacProcessorV3(r, requestHandler, this); if (null == writeThreadPool) { writeLac.run(); } else { @@ -411,8 +414,9 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final Channel c) { - ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, c, this); + private void processReadLacRequestV3(final BookkeeperProtocol.Request r, + final BookieRequestHandler requestHandler) { + ReadLacProcessorV3 readLac = new ReadLacProcessorV3(r, requestHandler, this); if (null == readThreadPool) { readLac.run(); } else { @@ -420,8 +424,8 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) { - WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this); + private void processAddRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) { + WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, requestHandler, this); final OrderedExecutor threadPool; if (RequestUtils.isHighPriority(r)) { @@ -455,8 +459,9 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, final Channel c) { - ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c, this); + private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, + final BookieRequestHandler requestHandler) { + ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, requestHandler, this); final OrderedExecutor threadPool; if (RequestUtils.isHighPriority(r)) { @@ -492,19 +497,20 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) { - ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c); + private void processReadRequestV3(final BookkeeperProtocol.Request r, final BookieRequestHandler requestHandler) { + ExecutorService fenceThread = null == highPriorityThreadPool ? null : + highPriorityThreadPool.chooseThread(requestHandler.ctx()); final ReadEntryProcessorV3 read; final OrderedExecutor threadPool; if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) { - ExecutorService lpThread = longPollThreadPool.chooseThread(c); + ExecutorService lpThread = longPollThreadPool.chooseThread(requestHandler.ctx()); - read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread, + read = new LongPollReadEntryProcessorV3(r, requestHandler, this, fenceThread, lpThread, requestTimer); threadPool = longPollThreadPool; } else { - read = new ReadEntryProcessorV3(r, c, this, fenceThread); + read = new ReadEntryProcessorV3(r, requestHandler, this, fenceThread); // If it's a high priority read (fencing or as part of recovery process), we want to make sure it // gets executed as fast as possible, so bypass the normal readThreadPool @@ -544,13 +550,16 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, final Channel c) { + private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, + final BookieRequestHandler requestHandler) { BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder(); BookkeeperProtocol.BKPacketHeader.Builder header = BookkeeperProtocol.BKPacketHeader.newBuilder(); header.setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE); header.setOperation(r.getHeader().getOperation()); header.setTxnId(r.getHeader().getTxnId()); response.setHeader(header.build()); + final Channel c = requestHandler.ctx().channel(); + if (shFactory == null) { LOG.error("Got StartTLS request but TLS not configured"); response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ); @@ -596,8 +605,9 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r, final Channel c) { - GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, c, this); + private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r, + final BookieRequestHandler requestHandler) { + GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, requestHandler, this); if (null == readThreadPool) { getBookieInfo.run(); } else { @@ -605,9 +615,10 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, final Channel c) { - GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = new GetListOfEntriesOfLedgerProcessorV3(r, c, - this); + private void processGetListOfEntriesOfLedgerProcessorV3(final BookkeeperProtocol.Request r, + final BookieRequestHandler requestHandler) { + GetListOfEntriesOfLedgerProcessorV3 getListOfEntriesOfLedger = + new GetListOfEntriesOfLedgerProcessorV3(r, requestHandler, this); if (null == readThreadPool) { getListOfEntriesOfLedger.run(); } else { @@ -615,8 +626,8 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Channel c) { - WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this); + private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final BookieRequestHandler requestHandler) { + WriteEntryProcessor write = WriteEntryProcessor.create(r, requestHandler, this); // If it's a high priority add (usually as part of recovery process), we want to make sure it gets // executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool @@ -647,10 +658,11 @@ public class BookieRequestProcessor implements RequestProcessor { } } - private void processReadRequest(final BookieProtocol.ReadRequest r, final Channel c) { + private void processReadRequest(final BookieProtocol.ReadRequest r, final BookieRequestHandler requestHandler) { ExecutorService fenceThreadPool = - null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c); - ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this, fenceThreadPool, throttleReadResponses); + null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(requestHandler.ctx()); + ReadEntryProcessor read = ReadEntryProcessor.create(r, requestHandler, + this, fenceThreadPool, throttleReadResponses); // If it's a high priority read (fencing or as part of recovery process), we want to make sure it // gets executed as fast as possible, so bypass the normal readThreadPool diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java index de73f95011..c1627579c6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto; import static com.google.common.base.Preconditions.checkArgument; -import io.netty.channel.Channel; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieImpl; import org.apache.bookkeeper.net.BookieId; @@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory; class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ForceLedgerProcessorV3.class); - public ForceLedgerProcessorV3(Request request, Channel channel, + public ForceLedgerProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { - super(request, channel, requestProcessor); + super(request, requestHandler, requestProcessor); } // Returns null if there is no exception thrown @@ -98,7 +97,7 @@ class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable { }; StatusCode status = null; try { - requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel); + requestProcessor.getBookie().forceLedger(ledgerId, wcb, requestHandler); status = StatusCode.EOK; } catch (Throwable t) { logger.error("Unexpected exception while forcing ledger {} : ", ledgerId, t); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java index 6f24255586..8795263a5b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java @@ -20,7 +20,6 @@ */ package org.apache.bookkeeper.proto; -import io.netty.channel.Channel; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest; @@ -38,9 +37,9 @@ import org.slf4j.LoggerFactory; public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(GetBookieInfoProcessorV3.class); - public GetBookieInfoProcessorV3(Request request, Channel channel, + public GetBookieInfoProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { - super(request, channel, requestProcessor); + super(request, requestHandler, requestProcessor); } private GetBookieInfoResponse getGetBookieInfoResponse() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java index 57f72208d8..90c850841d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetListOfEntriesOfLedgerProcessorV3.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.proto; import com.google.protobuf.ByteString; -import io.netty.channel.Channel; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie; @@ -44,9 +43,9 @@ public class GetListOfEntriesOfLedgerProcessorV3 extends PacketProcessorBaseV3 i protected final GetListOfEntriesOfLedgerRequest getListOfEntriesOfLedgerRequest; protected final long ledgerId; - public GetListOfEntriesOfLedgerProcessorV3(Request request, Channel channel, + public GetListOfEntriesOfLedgerProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { - super(request, channel, requestProcessor); + super(request, requestHandler, requestProcessor); this.getListOfEntriesOfLedgerRequest = request.getGetListOfEntriesOfLedgerRequest(); this.ledgerId = getListOfEntriesOfLedgerRequest.getLedgerId(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java index f61a2688ba..658c37c594 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java @@ -18,7 +18,6 @@ package org.apache.bookkeeper.proto; import com.google.common.base.Stopwatch; -import io.netty.channel.Channel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import java.io.IOException; @@ -55,12 +54,12 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watch private boolean shouldReadEntry = false; LongPollReadEntryProcessorV3(Request request, - Channel channel, + BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor, ExecutorService fenceThreadPool, ExecutorService longPollThreadPool, HashedWheelTimer requestTimer) { - super(request, channel, requestProcessor, fenceThreadPool); + super(request, requestHandler, requestProcessor, fenceThreadPool); this.previousLAC = readRequest.getPreviousLAC(); this.longPollThreadPool = longPollThreadPool; this.requestTimer = requestTimer; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index 8d079504b1..c9798156c2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -35,20 +35,20 @@ import org.slf4j.LoggerFactory; abstract class PacketProcessorBase<T extends Request> implements Runnable { private static final Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class); T request; - Channel channel; + BookieRequestHandler requestHandler; BookieRequestProcessor requestProcessor; long enqueueNanos; - protected void init(T request, Channel channel, BookieRequestProcessor requestProcessor) { + protected void init(T request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { this.request = request; - this.channel = channel; + this.requestHandler = requestHandler; this.requestProcessor = requestProcessor; this.enqueueNanos = MathUtils.nowInNano(); } protected void reset() { request = null; - channel = null; + requestHandler = null; requestProcessor = null; enqueueNanos = -1; } @@ -82,8 +82,10 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable { protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) { final long writeNanos = MathUtils.nowInNano(); - final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis(); + + Channel channel = requestHandler.ctx().channel(); + if (timeOut >= 0 && !channel.isWritable()) { if (!requestProcessor.isBlacklisted(channel)) { synchronized (channel) { @@ -120,18 +122,23 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable { } if (channel.isActive()) { - ChannelPromise promise = channel.newPromise().addListener(future -> { - if (!future.isSuccess()) { - logger.debug("Netty channel write exception. ", future.cause()); - } - }); + ChannelPromise promise = channel.voidPromise(); + if (logger.isDebugEnabled()) { + promise = channel.newPromise().addListener(future -> { + if (!future.isSuccess()) { + logger.debug("Netty channel write exception. ", future.cause()); + } + }); + } channel.writeAndFlush(response, promise); } else { if (response instanceof BookieProtocol.Response) { ((BookieProtocol.Response) response).release(); } + if (logger.isDebugEnabled()) { logger.debug("Netty channel {} is inactive, " + "hence bypassing netty channel writeAndFlush during sendResponse", channel); + } } if (BookieProtocol.EOK == rc) { statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); @@ -149,6 +156,7 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable { */ protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) { try { + Channel channel = requestHandler.ctx().channel(); ChannelFuture future = channel.writeAndFlush(response); if (!channel.eventLoop().inEventLoop()) { future.get(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java index ccc452ae60..dac454933c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java @@ -40,14 +40,14 @@ import org.apache.bookkeeper.util.StringUtils; public abstract class PacketProcessorBaseV3 implements Runnable { final Request request; - final Channel channel; + final BookieRequestHandler requestHandler; final BookieRequestProcessor requestProcessor; final long enqueueNanos; - public PacketProcessorBaseV3(Request request, Channel channel, + public PacketProcessorBaseV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { this.request = request; - this.channel = channel; + this.requestHandler = requestHandler; this.requestProcessor = requestProcessor; this.enqueueNanos = MathUtils.nowInNano(); } @@ -55,6 +55,7 @@ public abstract class PacketProcessorBaseV3 implements Runnable { protected void sendResponse(StatusCode code, Object response, OpStatsLogger statsLogger) { final long writeNanos = MathUtils.nowInNano(); + Channel channel = requestHandler.ctx().channel(); final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis(); if (timeOut >= 0 && !channel.isWritable()) { if (!requestProcessor.isBlacklisted(channel)) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 71ee51f3fa..6935ca8be6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -18,7 +18,6 @@ package org.apache.bookkeeper.proto; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import java.io.IOException; @@ -44,15 +43,15 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { private boolean throttleReadResponses; public static ReadEntryProcessor create(ReadRequest request, - Channel channel, + BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor, ExecutorService fenceThreadPool, boolean throttleReadResponses) { ReadEntryProcessor rep = RECYCLER.get(); - rep.init(request, channel, requestProcessor); + rep.init(request, requestHandler, requestProcessor); rep.fenceThreadPool = fenceThreadPool; rep.throttleReadResponses = throttleReadResponses; - requestProcessor.onReadRequestStart(channel); + requestProcessor.onReadRequestStart(requestHandler.ctx().channel()); return rep; } @@ -61,9 +60,9 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { if (LOG.isDebugEnabled()) { LOG.debug("Received new read request: {}", request); } - if (!channel.isOpen()) { + if (!requestHandler.ctx().channel().isOpen()) { if (LOG.isDebugEnabled()) { - LOG.debug("Dropping read request for closed channel: {}", channel); + LOG.debug("Dropping read request for closed channel: {}", requestHandler.ctx().channel()); } requestProcessor.onReadRequestFinish(); return; @@ -74,7 +73,8 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { try { CompletableFuture<Boolean> fenceResult = null; if (request.isFencing()) { - LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(), channel.remoteAddress()); + LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(), + requestHandler.ctx().channel().remoteAddress()); if (request.hasMasterKey()) { fenceResult = requestProcessor.getBookie().fenceLedger(request.getLedgerId(), diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index 4672d592d8..999b8095db 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -57,11 +57,11 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { protected final OpStatsLogger reqStats; public ReadEntryProcessorV3(Request request, - Channel channel, + BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor, ExecutorService fenceThreadPool) { - super(request, channel, requestProcessor); - requestProcessor.onReadRequestStart(channel); + super(request, requestHandler, requestProcessor); + requestProcessor.onReadRequestStart(requestHandler.ctx().channel()); this.readRequest = request.getReadRequest(); this.ledgerId = readRequest.getLedgerId(); @@ -194,6 +194,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { protected ReadResponse getReadResponse() { final Stopwatch startTimeSw = Stopwatch.createStarted(); + final Channel channel = requestHandler.ctx().channel(); final ReadResponse.Builder readResponse = ReadResponse.newBuilder() .setLedgerId(ledgerId) @@ -249,9 +250,9 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { public void run() { requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent( MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); - if (!channel.isOpen()) { + if (!requestHandler.ctx().channel().isOpen()) { if (LOG.isDebugEnabled()) { - LOG.debug("Dropping read request for closed channel: {}", channel); + LOG.debug("Dropping read request for closed channel: {}", requestHandler.ctx().channel()); } requestProcessor.onReadRequestFinish(); return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java index abb7d61646..25fe9530ad 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto; import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -43,9 +42,9 @@ import org.slf4j.LoggerFactory; class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class); - public ReadLacProcessorV3(Request request, Channel channel, + public ReadLacProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { - super(request, channel, requestProcessor); + super(request, requestHandler, requestProcessor); } // Returns null if there is no exception thrown diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 531dfecccc..7e8f9fa768 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.proto; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.util.Recycler; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -47,11 +46,11 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen startTimeNanos = -1L; } - public static WriteEntryProcessor create(ParsedAddRequest request, Channel channel, + public static WriteEntryProcessor create(ParsedAddRequest request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { WriteEntryProcessor wep = RECYCLER.get(); - wep.init(request, channel, requestProcessor); - requestProcessor.onAddRequestStart(channel); + wep.init(request, requestHandler, requestProcessor); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel()); return wep; } @@ -74,9 +73,10 @@ class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> implemen ByteBuf addData = request.getData(); try { if (request.isRecoveryAdd()) { - requestProcessor.getBookie().recoveryAddEntry(addData, this, channel, request.getMasterKey()); + requestProcessor.getBookie().recoveryAddEntry(addData, this, requestHandler, request.getMasterKey()); } else { - requestProcessor.getBookie().addEntry(addData, false, this, channel, request.getMasterKey()); + requestProcessor.getBookie().addEntry(addData, false, this, + requestHandler, request.getMasterKey()); } } catch (OperationRejectedException e) { requestProcessor.getRequestStats().getAddEntryRejectedCounter().inc(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 7d59858732..36aff7ad92 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import java.io.IOException; import java.util.EnumSet; import java.util.concurrent.TimeUnit; @@ -43,10 +42,10 @@ import org.slf4j.LoggerFactory; class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { private static final Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class); - public WriteEntryProcessorV3(Request request, Channel channel, + public WriteEntryProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { - super(request, channel, requestProcessor); - requestProcessor.onAddRequestStart(channel); + super(request, requestHandler, requestProcessor); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel()); } // Returns null if there is no exception thrown @@ -118,9 +117,11 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer()); try { if (RequestUtils.hasFlag(addRequest, AddRequest.Flag.RECOVERY_ADD)) { - requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb, channel, masterKey); + requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb, + requestHandler.ctx().channel(), masterKey); } else { - requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb, channel, masterKey); + requestProcessor.getBookie().addEntry(entryToAdd, ackBeforeSync, wcb, + requestHandler.ctx().channel(), masterKey); } status = StatusCode.EOK; } catch (OperationRejectedException e) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java index d8a427c905..293cea3bb0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.proto; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; @@ -40,9 +39,9 @@ import org.slf4j.LoggerFactory; class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private static final Logger logger = LoggerFactory.getLogger(WriteLacProcessorV3.class); - public WriteLacProcessorV3(Request request, Channel channel, + public WriteLacProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { - super(request, channel, requestProcessor); + super(request, requestHandler, requestProcessor); } // Returns null if there is no exception thrown @@ -103,7 +102,8 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { byte[] masterKey = writeLacRequest.getMasterKey().toByteArray(); try { - requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), writeCallback, channel, masterKey); + requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), + writeCallback, requestHandler, masterKey); status = StatusCode.EOK; } catch (IOException e) { logger.error("Error saving lac {} for ledger:{}", diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java index 54460770f1..3bc9cbee42 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import java.util.concurrent.CountDownLatch; @@ -55,6 +56,8 @@ public class ForceLedgerProcessorV3Test { private Request request; private ForceLedgerProcessorV3 processor; + + private BookieRequestHandler requestHandler; private Channel channel; private BookieRequestProcessor requestProcessor; private Bookie bookie; @@ -71,17 +74,25 @@ public class ForceLedgerProcessorV3Test { .setLedgerId(System.currentTimeMillis()) .build()) .build(); + + channel = mock(Channel.class); when(channel.isOpen()).thenReturn(true); + when(channel.isActive()).thenReturn(true); + + requestHandler = mock(BookieRequestHandler.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L); when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); - when(channel.isActive()).thenReturn(true); processor = new ForceLedgerProcessorV3( request, - channel, + requestHandler, requestProcessor); } @@ -102,7 +113,7 @@ public class ForceLedgerProcessorV3Test { }).when(bookie).forceLedger( eq(request.getForceLedgerRequest().getLedgerId()), any(WriteCallback.class), - same(channel)); + same(requestHandler)); ChannelPromise promise = new DefaultChannelPromise(channel); AtomicReference<Object> writtenObject = new AtomicReference<>(); @@ -117,7 +128,7 @@ public class ForceLedgerProcessorV3Test { verify(bookie, times(1)) .forceLedger(eq(request.getForceLedgerRequest().getLedgerId()), - any(WriteCallback.class), same(channel)); + any(WriteCallback.class), same(requestHandler)); verify(channel, times(1)).writeAndFlush(any(Response.class)); latch.await(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java index e294a55ccf..5e986515e9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3Test.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie; @@ -38,6 +39,7 @@ import org.junit.Test; */ public class GetBookieInfoProcessorV3Test { + private BookieRequestHandler requestHandler; private Channel channel; private BookieRequestProcessor requestProcessor; private Bookie bookie; @@ -55,9 +57,17 @@ public class GetBookieInfoProcessorV3Test { requestProcessor = mock(BookieRequestProcessor.class); bookie = mock(Bookie.class); when(requestProcessor.getBookie()).thenReturn(bookie); + + requestHandler = mock(BookieRequestHandler.class); + channel = mock(Channel.class); when(channel.isOpen()).thenReturn(true); when(channel.isActive()).thenReturn(true); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + when(requestProcessor.getRequestStats()).thenReturn(requestStats); when(requestProcessor.getRequestStats().getGetBookieInfoStats()) .thenReturn(getBookieInfoStats); @@ -85,7 +95,7 @@ public class GetBookieInfoProcessorV3Test { .build(); GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3( - getBookieInfoRequest, channel, requestProcessor); + getBookieInfoRequest, requestHandler, requestProcessor); getBookieInfo.run(); // get BookieInfo succeeded. diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java index 9eae1b9c0d..33a4fdc829 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.util.HashedWheelTimer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -84,6 +85,12 @@ public class LongPollReadEntryProcessorV3Test { Channel channel = mock(Channel.class); when(channel.isOpen()).thenReturn(true); + + BookieRequestHandler requestHandler = mock(BookieRequestHandler.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + Bookie bookie = mock(Bookie.class); BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class); @@ -104,7 +111,7 @@ public class LongPollReadEntryProcessorV3Test { LongPollReadEntryProcessorV3 processor = new LongPollReadEntryProcessorV3( request, - channel, + requestHandler, requestProcessor, executor, executor, timer); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index f3e9764e71..91e100d809 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.channel.EventLoop; @@ -53,6 +54,7 @@ import org.junit.Test; public class ReadEntryProcessorTest { private Channel channel; + private BookieRequestHandler requestHandler; private BookieRequestProcessor requestProcessor; private Bookie bookie; @@ -60,6 +62,12 @@ public class ReadEntryProcessorTest { public void setup() throws IOException, BookieException { channel = mock(Channel.class); when(channel.isOpen()).thenReturn(true); + + requestHandler = mock(BookieRequestHandler.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); @@ -101,7 +109,8 @@ public class ReadEntryProcessorTest { long ledgerId = System.currentTimeMillis(); ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); - ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, service, true); + ReadEntryProcessor processor = ReadEntryProcessor.create( + request, requestHandler, requestProcessor, service, true); processor.run(); fenceResult.complete(result); @@ -143,7 +152,7 @@ public class ReadEntryProcessorTest { long ledgerId = System.currentTimeMillis(); ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); - ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true); + ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); fenceResult.complete(result); processor.run(); @@ -173,7 +182,7 @@ public class ReadEntryProcessorTest { long ledgerId = System.currentTimeMillis(); ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); - ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true); + ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); processor.run(); latch.await(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java index b88f819949..d5ee8f5275 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java @@ -27,9 +27,12 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; @@ -137,7 +140,14 @@ public class TestBookieRequestProcessor { .setBody(ByteString.copyFrom("entrydata".getBytes())).build(); Request request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build(); - WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor); + Channel channel = mock(Channel.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + BookieRequestHandler requestHandler = mock(BookieRequestHandler.class); + when(requestHandler.ctx()).thenReturn(ctx); + + WriteEntryProcessorV3 writeEntryProcessorV3 = new WriteEntryProcessorV3(request, requestHandler, + requestProcessor); String toString = writeEntryProcessorV3.toString(); assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body")); assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey", @@ -155,7 +165,7 @@ public class TestBookieRequestProcessor { .setBody(ByteString.copyFrom("entrydata".getBytes())).setFlag(Flag.RECOVERY_ADD).setWriteFlags(0) .build(); request = Request.newBuilder().setHeader(header).setAddRequest(addRequest).build(); - writeEntryProcessorV3 = new WriteEntryProcessorV3(request, null, requestProcessor); + writeEntryProcessorV3 = new WriteEntryProcessorV3(request, requestHandler, requestProcessor); toString = writeEntryProcessorV3.toString(); assertFalse("writeEntryProcessorV3's toString should have filtered out body", toString.contains("body")); assertFalse("writeEntryProcessorV3's toString should have filtered out masterKey", diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 4479ea4a7a..8fc3a89f00 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import java.util.concurrent.CountDownLatch; @@ -53,6 +54,8 @@ public class WriteEntryProcessorTest { private ParsedAddRequest request; private WriteEntryProcessor processor; private Channel channel; + private ChannelHandlerContext ctx; + private BookieRequestHandler requestHandler; private BookieRequestProcessor requestProcessor; private Bookie bookie; @@ -67,6 +70,12 @@ public class WriteEntryProcessorTest { Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8))); channel = mock(Channel.class); when(channel.isOpen()).thenReturn(true); + + requestHandler = mock(BookieRequestHandler.class); + ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); @@ -75,7 +84,7 @@ public class WriteEntryProcessorTest { when(channel.isWritable()).thenReturn(true); processor = WriteEntryProcessor.create( request, - channel, + requestHandler, requestProcessor); } @@ -93,7 +102,7 @@ public class WriteEntryProcessorTest { Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8))); processor = WriteEntryProcessor.create( request, - channel, + requestHandler, requestProcessor); } @@ -110,11 +119,11 @@ public class WriteEntryProcessorTest { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any(ChannelPromise.class)); + }).when(channel).writeAndFlush(any(), any()); processor.run(); - verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class)); + verify(channel, times(1)).writeAndFlush(any(), any()); latch.await(); @@ -142,11 +151,11 @@ public class WriteEntryProcessorTest { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any(ChannelPromise.class)); + }).when(channel).writeAndFlush(any(), any()); processor.run(); - verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class)); + verify(channel, times(1)).writeAndFlush(any(), any()); latch.await(); @@ -170,7 +179,7 @@ public class WriteEntryProcessorTest { doAnswer(invocationOnMock -> { processor.writeComplete(0, request.ledgerId, request.entryId, null, null); return null; - }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0])); + }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); AtomicReference<Object> writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); @@ -178,13 +187,13 @@ public class WriteEntryProcessorTest { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any(ChannelPromise.class)); + }).when(channel).writeAndFlush(any(), any()); processor.run(); verify(bookie, times(1)) - .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class)); + .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); + verify(channel, times(1)).writeAndFlush(any(), any()); latch.await(); @@ -205,7 +214,7 @@ public class WriteEntryProcessorTest { doAnswer(invocationOnMock -> { processor.writeComplete(0, request.ledgerId, request.entryId, null, null); return null; - }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0])); + }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); AtomicReference<Object> writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); @@ -213,13 +222,13 @@ public class WriteEntryProcessorTest { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any(ChannelPromise.class)); + }).when(channel).writeAndFlush(any(), any()); processor.run(); verify(bookie, times(1)) - .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class)); + .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); + verify(channel, times(1)).writeAndFlush(any(), any()); latch.await(); @@ -241,7 +250,7 @@ public class WriteEntryProcessorTest { doAnswer(invocationOnMock -> { throw new BookieException.OperationRejectedException(); }).when(bookie).addEntry( - any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0])); + any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); ChannelPromise promise = new DefaultChannelPromise(channel); AtomicReference<Object> writtenObject = new AtomicReference<>(); @@ -250,13 +259,13 @@ public class WriteEntryProcessorTest { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return promise; - }).when(channel).writeAndFlush(any(), any(ChannelPromise.class)); + }).when(channel).writeAndFlush(any(), any()); processor.run(); verify(bookie, times(1)) - .addEntry(any(ByteBuf.class), eq(false), same(processor), same(channel), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(Response.class), any(ChannelPromise.class)); + .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); + verify(channel, times(1)).writeAndFlush(any(Response.class), any()); latch.await(); assertTrue(writtenObject.get() instanceof Response); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java index 2175b26a5b..40eb662cc5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import java.util.concurrent.CountDownLatch; @@ -57,7 +58,9 @@ public class WriteEntryProcessorV3Test { private Request request; private WriteEntryProcessorV3 processor; + private Channel channel; + private BookieRequestHandler requestHandler; private BookieRequestProcessor requestProcessor; private Bookie bookie; @@ -78,6 +81,12 @@ public class WriteEntryProcessorV3Test { .build(); channel = mock(Channel.class); when(channel.isOpen()).thenReturn(true); + + requestHandler = mock(BookieRequestHandler.class); + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.channel()).thenReturn(channel); + when(requestHandler.ctx()).thenReturn(ctx); + bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); @@ -86,7 +95,7 @@ public class WriteEntryProcessorV3Test { when(channel.isActive()).thenReturn(true); processor = new WriteEntryProcessorV3( request, - channel, + requestHandler, requestProcessor); } @@ -99,7 +108,7 @@ public class WriteEntryProcessorV3Test { processor = new WriteEntryProcessorV3( request, - channel, + requestHandler, requestProcessor); }