This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 767e455  Recycle instances of WriteEntryProcessor and 
ReadEntryProcessor
767e455 is described below

commit 767e4556b7f5cc523b127e7987cc442f707699d9
Author: Aaron Gresch <[email protected]>
AuthorDate: Fri Oct 27 11:51:32 2017 -0700

    Recycle instances of WriteEntryProcessor and ReadEntryProcessor
    
    merge of some changes from Yahoo's repo
    
    Author: Aaron Gresch <[email protected]>
    
    Reviewers: Sijie Guo <[email protected]>, Matteo Merli <[email protected]>
    
    This closes #669 from agresch/agresch_recycle_entryprocessor
---
 .../bookkeeper/proto/BookieRequestProcessor.java   |  4 +--
 .../bookkeeper/proto/PacketProcessorBase.java      | 17 +++++++----
 .../bookkeeper/proto/ReadEntryProcessor.java       | 27 ++++++++++++++++--
 .../bookkeeper/proto/WriteEntryProcessor.java      | 33 ++++++++++++++++++++--
 4 files changed, 70 insertions(+), 11 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 8d719e6..81208f3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -374,7 +374,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     }
 
     private void processAddRequest(final BookieProtocol.Request r, final 
Channel c) {
-        WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
+        WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
         if (null == writeThreadPool) {
             write.run();
         } else {
@@ -383,7 +383,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     }
 
     private void processReadRequest(final BookieProtocol.Request r, final 
Channel c) {
-        ReadEntryProcessor read = new ReadEntryProcessor(r, c, this);
+        ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
         if (null == readThreadPool) {
             read.run();
         } else {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 4f14dcf..5c9d8d2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -30,18 +30,25 @@ import io.netty.channel.Channel;
 
 abstract class PacketProcessorBase extends SafeRunnable {
     private final static Logger logger = 
LoggerFactory.getLogger(PacketProcessorBase.class);
-    final Request request;
-    final Channel channel;
-    final BookieRequestProcessor requestProcessor;
-    final long enqueueNanos;
+    Request request;
+    Channel channel;
+    BookieRequestProcessor requestProcessor;
+    long enqueueNanos;
 
-    PacketProcessorBase(Request request, Channel channel, 
BookieRequestProcessor requestProcessor) {
+    protected void init(Request request, Channel channel, 
BookieRequestProcessor requestProcessor) {
         this.request = request;
         this.channel = channel;
         this.requestProcessor = requestProcessor;
         this.enqueueNanos = MathUtils.nowInNano();
     }
 
+    protected void reset() {
+        request = null;
+        channel = null;
+        requestProcessor = null;
+        enqueueNanos = -1;
+    }
+
     protected boolean isVersionCompatible() {
         byte version = request.getProtocolVersion();
         if (version < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index f65455c..f651e13 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -19,7 +19,9 @@ package org.apache.bookkeeper.proto;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
@@ -37,9 +39,11 @@ import org.slf4j.LoggerFactory;
 class ReadEntryProcessor extends PacketProcessorBase {
     private final static Logger LOG = 
LoggerFactory.getLogger(ReadEntryProcessor.class);
 
-    public ReadEntryProcessor(Request request, Channel channel,
+    public static ReadEntryProcessor create(Request request, Channel channel,
                               BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        ReadEntryProcessor rep = RECYCLER.get();
+        rep.init(request, channel, requestProcessor);
+        return rep;
     }
 
     @Override
@@ -144,10 +148,29 @@ class ReadEntryProcessor extends PacketProcessorBase {
             sendResponse(errorCode, 
ResponseBuilder.buildErrorResponse(errorCode, read),
                          requestProcessor.readRequestStats);
         }
+        recycle();
     }
 
     @Override
     public String toString() {
         return String.format("ReadEntry(%d, %d)", request.getLedgerId(), 
request.getEntryId());
     }
+
+    private void recycle() {
+        super.reset();
+        this.recyclerHandle.recycle(this);
+    }
+
+    private final Recycler.Handle<ReadEntryProcessor> recyclerHandle;
+
+    private ReadEntryProcessor(Recycler.Handle<ReadEntryProcessor> 
recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    private static final Recycler<ReadEntryProcessor> RECYCLER = new 
Recycler<ReadEntryProcessor>() {
+        @Override
+        protected ReadEntryProcessor 
newObject(Recycler.Handle<ReadEntryProcessor> handle) {
+            return new ReadEntryProcessor(handle);
+        }
+    };
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 827aed9..c4b2840 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -18,6 +18,8 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.channel.Channel;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -39,9 +41,16 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
 
     long startTimeNanos;
 
-    public WriteEntryProcessor(Request request, Channel channel,
+    protected void reset() {
+        super.reset();
+        startTimeNanos = -1L;
+    }
+
+    public static WriteEntryProcessor create(Request request, Channel channel,
                                BookieRequestProcessor requestProcessor) {
-        super(request, channel, requestProcessor);
+        WriteEntryProcessor wep = RECYCLER.get();
+        wep.init(request, channel, requestProcessor);
+        return wep;
     }
 
     @Override
@@ -55,6 +64,7 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
             sendResponse(BookieProtocol.EREADONLY,
                          
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add),
                          requestProcessor.addRequestStats);
+            add.release();
             return;
         }
 
@@ -101,6 +111,7 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
         sendResponse(rc,
                      ResponseBuilder.buildAddResponse(request),
                      requestProcessor.addRequestStats);
+        recycle();
     }
 
     @Override
@@ -108,4 +119,22 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
         return String.format("WriteEntry(%d, %d)",
                              request.getLedgerId(), request.getEntryId());
     }
+    
+    private void recycle() {
+        reset();
+        recyclerHandle.recycle(this);
+    }
+
+    private final Recycler.Handle<WriteEntryProcessor> recyclerHandle;
+
+    private WriteEntryProcessor(Recycler.Handle<WriteEntryProcessor> 
recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    private static final Recycler<WriteEntryProcessor> RECYCLER = new 
Recycler<WriteEntryProcessor>() {
+        @Override
+        protected WriteEntryProcessor 
newObject(Recycler.Handle<WriteEntryProcessor> handle) {
+            return new WriteEntryProcessor(handle);
+        }
+    };
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to