This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a7991e5  Recycle responses callbacks
a7991e5 is described below

commit a7991e5616b0f78d3b80334774a5b7d63dabb8c9
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Dec 6 00:28:48 2017 -0800

    Recycle responses callbacks
    
    A response callback is created each time we receive a response from a
    server, which can result in a lot of garbage in the fast path.
    
    This change pools the callbacks.
    
    This change was originally 729bd2d0 in the yahoo-4.3 branch.
    
    Author: Ivan Kelly <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #809 from ivankelly/yahoo-bp-7
---
 .../org/apache/bookkeeper/proto/BookieClient.java  |  10 ++
 .../bookkeeper/proto/PerChannelBookieClient.java   | 106 ++++++++++++++-------
 2 files changed, 84 insertions(+), 32 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 183bb7b..ceac697 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -346,6 +346,16 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                 };
 
         public void recycle() {
+            bookieClient = null;
+            toSend = null;
+            ledgerId = -1;
+            entryId = -1;
+            addr = null;
+            ctx = null;
+            cb = null;
+            options = -1;
+            masterKey = null;
+
             recyclerHandle.recycle(this);
         }
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f094064..c2c8af1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1001,13 +1001,10 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     }
 
     private void readV2Response(final BookieProtocol.Response response) {
-        final long ledgerId = response.ledgerId;
-        final long entryId = response.entryId;
+        OperationType operationType = getOperationType(response.getOpCode());
+        StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
 
-        final OperationType operationType = 
getOperationType(response.getOpCode());
-        final StatusCode status = 
getStatusCodeFromErrorCode(response.errorCode);
-
-        final CompletionKey key = acquireV2Key(ledgerId, entryId, 
operationType);
+        CompletionKey key = acquireV2Key(response.ledgerId, response.entryId, 
operationType);
         CompletionValue completionValue = completionObjects.remove(key);
         key.release();
         if (completionValue == null) {
@@ -1024,20 +1021,84 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             // Unexpected response, so log it. The txnId should have been 
present.
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Unexpected response received from bookie : " + addr 
+ " for type : " + operationType
-                        + " and ledger:entry : " + ledgerId + ":" + entryId);
+                        + " and ledger:entry : " + response.ledgerId + ":" + 
response.entryId);
             }
         } else {
             long orderingKey = completionValue.ledgerId;
-            final CompletionValue finalCompletionValue = completionValue;
+            executor.submitOrdered(orderingKey,
+                    ReadV2ResponseCallback.create(completionValue, 
response.ledgerId, response.entryId,
+                                                  status, response));
+        }
+    }
+
+    private static class ReadV2ResponseCallback extends SafeRunnable {
+        CompletionValue completionValue;
+        long ledgerId;
+        long entryId;
+        StatusCode status;
+        BookieProtocol.Response response;
+
+        static ReadV2ResponseCallback create(CompletionValue completionValue, 
long ledgerId, long entryId,
+                                             StatusCode status, 
BookieProtocol.Response response) {
+            ReadV2ResponseCallback callback = RECYCLER.get();
+            callback.completionValue = completionValue;
+            callback.ledgerId = ledgerId;
+            callback.entryId = entryId;
+            callback.status = status;
+            callback.response = response;
+            return callback;
+        }
+
+        @Override
+        public void safeRun() {
+            completionValue.handleV2Response(ledgerId, entryId, status, 
response);
+            response.recycle();
+            recycle();
+        }
+
+        void recycle() {
+            completionValue = null;
+            ledgerId = -1;
+            entryId = -1;
+            status = null;
+            response = null;
+            recyclerHandle.recycle(this);
+        }
+
+        private final Handle<ReadV2ResponseCallback> recyclerHandle;
 
-            executor.submitOrdered(orderingKey, () -> {
-                    finalCompletionValue.handleV2Response(ledgerId, entryId, 
status, response);
-                    response.recycle();
-                });
+        private ReadV2ResponseCallback(Handle<ReadV2ResponseCallback> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
         }
+
+        private static final Recycler<ReadV2ResponseCallback> RECYCLER = new 
Recycler<ReadV2ResponseCallback>() {
+            @Override
+            protected ReadV2ResponseCallback 
newObject(Handle<ReadV2ResponseCallback> handle) {
+                return new ReadV2ResponseCallback(handle);
+            }
+        };
     }
 
-    private StatusCode getStatusCodeFromErrorCode(int errorCode) {
+    private static OperationType getOperationType(byte opCode) {
+        switch (opCode) {
+            case BookieProtocol.ADDENTRY:
+                return  OperationType.ADD_ENTRY;
+            case BookieProtocol.READENTRY:
+                return OperationType.READ_ENTRY;
+            case BookieProtocol.AUTH:
+                return OperationType.AUTH;
+            case BookieProtocol.READ_LAC:
+                return OperationType.READ_LAC;
+            case BookieProtocol.WRITE_LAC:
+                return OperationType.WRITE_LAC;
+            case BookieProtocol.GET_BOOKIE_INFO:
+                return OperationType.GET_BOOKIE_INFO;
+            default:
+                throw new IllegalArgumentException("Invalid operation type " + 
opCode);
+        }
+    }
+
+    private static StatusCode getStatusCodeFromErrorCode(int errorCode) {
         switch (errorCode) {
             case BookieProtocol.EOK:
                 return StatusCode.EOK;
@@ -1064,25 +1125,6 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         }
     }
 
-    private OperationType getOperationType(byte opCode) {
-        switch (opCode) {
-            case BookieProtocol.ADDENTRY:
-                return  OperationType.ADD_ENTRY;
-            case BookieProtocol.READENTRY:
-                return OperationType.READ_ENTRY;
-            case BookieProtocol.AUTH:
-                return OperationType.AUTH;
-            case BookieProtocol.READ_LAC:
-                return OperationType.READ_LAC;
-            case BookieProtocol.WRITE_LAC:
-                return OperationType.WRITE_LAC;
-            case BookieProtocol.GET_BOOKIE_INFO:
-                return OperationType.GET_BOOKIE_INFO;
-            default:
-                throw new IllegalArgumentException("Invalid operation type");
-        }
-    }
-
     private void readV3Response(final Response response) {
         final BKPacketHeader header = response.getHeader();
 

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to