This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 2cf9bd42bef2591dbed87907d151bb82be3bf8b2 Author: Penghui Li <[email protected]> AuthorDate: Wed Mar 8 01:50:00 2023 +0800 Make read entry request recyclable (#3842) * Make read entry request recyclable * Move recycle to finally block * Fix test and comments * Fix test (cherry picked from commit 09365dfdd8aa20c603d7a134d7333b907ce50d24) --- .../bookkeeper/proto/BookieProtoEncoding.java | 6 ++-- .../apache/bookkeeper/proto/BookieProtocol.java | 35 ++++++++++++++++++++-- .../bookkeeper/proto/PerChannelBookieClient.java | 5 ++-- .../bookkeeper/proto/ReadEntryProcessor.java | 1 + .../bookkeeper/proto/ReadEntryProcessorTest.java | 6 ++-- .../bookkeeper/proto/TestBackwardCompatCMS42.java | 5 ++-- 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 88ee09d61f..b39acd41d2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -139,7 +139,7 @@ public class BookieProtoEncoding { if (r.hasMasterKey()) { buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); } - + r.recycle(); return buf; } else if (r instanceof BookieProtocol.AuthRequest) { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage(); @@ -190,9 +190,9 @@ public class BookieProtoEncoding { if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING && version >= 2) { byte[] masterKey = readMasterKey(packet); - return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, masterKey); + return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, masterKey); } else { - return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, null); + return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, null); } case BookieProtocol.AUTH: BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 25196587a8..78731d3ead 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -362,14 +362,43 @@ public interface BookieProtocol { * A Request that reads data. */ class ReadRequest extends Request { - ReadRequest(byte protocolVersion, long ledgerId, long entryId, - short flags, byte[] masterKey) { - init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey); + + static ReadRequest create(byte protocolVersion, long ledgerId, long entryId, + short flags, byte[] masterKey) { + ReadRequest read = RECYCLER.get(); + read.protocolVersion = protocolVersion; + read.opCode = READENTRY; + read.ledgerId = ledgerId; + read.entryId = entryId; + read.flags = flags; + read.masterKey = masterKey; + return read; } boolean isFencing() { return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING; } + + private final Handle<ReadRequest> recyclerHandle; + + private ReadRequest(Handle<ReadRequest> recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler<ReadRequest> RECYCLER = new Recycler<ReadRequest>() { + @Override + protected ReadRequest newObject(Handle<ReadRequest> handle) { + return new ReadRequest(handle); + } + }; + + @Override + public void recycle() { + ledgerId = -1; + entryId = -1; + masterKey = null; + recyclerHandle.recycle(this); + } } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index f82878ca3c..3971eb55e8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -840,7 +840,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 0, (short) 0, null); completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC); } else { @@ -924,7 +924,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, (short) flags, masterKey); completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY); } else { @@ -1159,7 +1159,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); - channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 03ff80cfe7..c1127436ac 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -190,6 +190,7 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { } private void recycle() { + request.recycle(); super.reset(); this.recyclerHandle.recycle(this); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index 114d614b28..58b036539b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -98,7 +98,7 @@ public class ReadEntryProcessorTest { ExecutorService service = Executors.newCachedThreadPool(); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, service, true); processor.run(); @@ -140,7 +140,7 @@ public class ReadEntryProcessorTest { }).when(channel).writeAndFlush(any(Response.class)); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true); fenceResult.complete(result); @@ -170,7 +170,7 @@ public class ReadEntryProcessorTest { }).when(channel).writeAndFlush(any(Response.class)); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, channel, requestProcessor, null, true); processor.run(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index 0888f7a584..ed5f510d25 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -164,8 +164,9 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { } - client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, - 1L, 1L, (short) 0, null)); + ReadRequest read = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, + 1L, 1L, (short) 0, null); + client.sendRequest(read); Response response = client.takeResponse(); assertEquals("Should have failed", response.getErrorCode(), BookieProtocol.EUA);
