This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a7991e5 Recycle responses callbacks
a7991e5 is described below
commit a7991e5616b0f78d3b80334774a5b7d63dabb8c9
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Dec 6 00:28:48 2017 -0800
Recycle responses callbacks
A response callback is created each time we receive a response from a
server, which can result in a lot of garbage in the fast path.
This change pools the callbacks.
This change was originally 729bd2d0 in the yahoo-4.3 branch.
Author: Ivan Kelly <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #809 from ivankelly/yahoo-bp-7
---
.../org/apache/bookkeeper/proto/BookieClient.java | 10 ++
.../bookkeeper/proto/PerChannelBookieClient.java | 106 ++++++++++++++-------
2 files changed, 84 insertions(+), 32 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 183bb7b..ceac697 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -346,6 +346,16 @@ public class BookieClient implements
PerChannelBookieClientFactory {
};
public void recycle() {
+ bookieClient = null;
+ toSend = null;
+ ledgerId = -1;
+ entryId = -1;
+ addr = null;
+ ctx = null;
+ cb = null;
+ options = -1;
+ masterKey = null;
+
recyclerHandle.recycle(this);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f094064..c2c8af1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -1001,13 +1001,10 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
private void readV2Response(final BookieProtocol.Response response) {
- final long ledgerId = response.ledgerId;
- final long entryId = response.entryId;
+ OperationType operationType = getOperationType(response.getOpCode());
+ StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
- final OperationType operationType =
getOperationType(response.getOpCode());
- final StatusCode status =
getStatusCodeFromErrorCode(response.errorCode);
-
- final CompletionKey key = acquireV2Key(ledgerId, entryId,
operationType);
+ CompletionKey key = acquireV2Key(response.ledgerId, response.entryId,
operationType);
CompletionValue completionValue = completionObjects.remove(key);
key.release();
if (completionValue == null) {
@@ -1024,20 +1021,84 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
// Unexpected response, so log it. The txnId should have been
present.
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected response received from bookie : " + addr
+ " for type : " + operationType
- + " and ledger:entry : " + ledgerId + ":" + entryId);
+ + " and ledger:entry : " + response.ledgerId + ":" +
response.entryId);
}
} else {
long orderingKey = completionValue.ledgerId;
- final CompletionValue finalCompletionValue = completionValue;
+ executor.submitOrdered(orderingKey,
+ ReadV2ResponseCallback.create(completionValue,
response.ledgerId, response.entryId,
+ status, response));
+ }
+ }
+
+ private static class ReadV2ResponseCallback extends SafeRunnable {
+ CompletionValue completionValue;
+ long ledgerId;
+ long entryId;
+ StatusCode status;
+ BookieProtocol.Response response;
+
+ static ReadV2ResponseCallback create(CompletionValue completionValue,
long ledgerId, long entryId,
+ StatusCode status,
BookieProtocol.Response response) {
+ ReadV2ResponseCallback callback = RECYCLER.get();
+ callback.completionValue = completionValue;
+ callback.ledgerId = ledgerId;
+ callback.entryId = entryId;
+ callback.status = status;
+ callback.response = response;
+ return callback;
+ }
+
+ @Override
+ public void safeRun() {
+ completionValue.handleV2Response(ledgerId, entryId, status,
response);
+ response.recycle();
+ recycle();
+ }
+
+ void recycle() {
+ completionValue = null;
+ ledgerId = -1;
+ entryId = -1;
+ status = null;
+ response = null;
+ recyclerHandle.recycle(this);
+ }
+
+ private final Handle<ReadV2ResponseCallback> recyclerHandle;
- executor.submitOrdered(orderingKey, () -> {
- finalCompletionValue.handleV2Response(ledgerId, entryId,
status, response);
- response.recycle();
- });
+ private ReadV2ResponseCallback(Handle<ReadV2ResponseCallback>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
}
+
+ private static final Recycler<ReadV2ResponseCallback> RECYCLER = new
Recycler<ReadV2ResponseCallback>() {
+ @Override
+ protected ReadV2ResponseCallback
newObject(Handle<ReadV2ResponseCallback> handle) {
+ return new ReadV2ResponseCallback(handle);
+ }
+ };
}
- private StatusCode getStatusCodeFromErrorCode(int errorCode) {
+ private static OperationType getOperationType(byte opCode) {
+ switch (opCode) {
+ case BookieProtocol.ADDENTRY:
+ return OperationType.ADD_ENTRY;
+ case BookieProtocol.READENTRY:
+ return OperationType.READ_ENTRY;
+ case BookieProtocol.AUTH:
+ return OperationType.AUTH;
+ case BookieProtocol.READ_LAC:
+ return OperationType.READ_LAC;
+ case BookieProtocol.WRITE_LAC:
+ return OperationType.WRITE_LAC;
+ case BookieProtocol.GET_BOOKIE_INFO:
+ return OperationType.GET_BOOKIE_INFO;
+ default:
+ throw new IllegalArgumentException("Invalid operation type " +
opCode);
+ }
+ }
+
+ private static StatusCode getStatusCodeFromErrorCode(int errorCode) {
switch (errorCode) {
case BookieProtocol.EOK:
return StatusCode.EOK;
@@ -1064,25 +1125,6 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
- private OperationType getOperationType(byte opCode) {
- switch (opCode) {
- case BookieProtocol.ADDENTRY:
- return OperationType.ADD_ENTRY;
- case BookieProtocol.READENTRY:
- return OperationType.READ_ENTRY;
- case BookieProtocol.AUTH:
- return OperationType.AUTH;
- case BookieProtocol.READ_LAC:
- return OperationType.READ_LAC;
- case BookieProtocol.WRITE_LAC:
- return OperationType.WRITE_LAC;
- case BookieProtocol.GET_BOOKIE_INFO:
- return OperationType.GET_BOOKIE_INFO;
- default:
- throw new IllegalArgumentException("Invalid operation type");
- }
- }
-
private void readV3Response(final Response response) {
final BKPacketHeader header = response.getHeader();
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].