This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e202d95 Recycle AddRequest/AddResponse objects
e202d95 is described below
commit e202d955a022a501ea49bc5906c36b38bdbc50df
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Nov 2 07:55:15 2017 +0100
Recycle AddRequest/AddResponse objects
This change was originally afd0ecb6 & 75bf0fa1 on the yahoo-4.3 branch
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #665 from ivankelly/yahoo-bp-2
---
.../apache/bookkeeper/client/LedgerHandleAdv.java | 6 +-
.../org/apache/bookkeeper/proto/AuthHandler.java | 2 +-
.../bookkeeper/proto/BookieProtoEncoding.java | 54 ++++----
.../apache/bookkeeper/proto/BookieProtocol.java | 139 ++++++++++++++-------
.../bookkeeper/proto/PerChannelBookieClient.java | 11 +-
.../apache/bookkeeper/proto/ResponseBuilder.java | 4 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 10 +-
7 files changed, 148 insertions(+), 78 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 28324b1..0ad9a9d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -175,7 +175,6 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
throttler.acquire();
}
- final long currentLength;
boolean wasClosed = false;
synchronized (this) {
// synchronized on this to ensure that
@@ -183,12 +182,11 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
// updating lastAddPushed
if (metadata.isClosed()) {
wasClosed = true;
- currentLength = 0;
} else {
- currentLength = addToLength(op.payload.readableBytes());
+ long currentLength = addToLength(op.payload.readableBytes());
+ op.setLedgerLength(currentLength);
pendingAddOps.add(op);
}
- op.setLedgerLength(currentLength);
}
if (wasClosed) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index 409fe4b..0780785 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -103,7 +103,7 @@ class AuthHandler {
BookieProtocol.Request req = (BookieProtocol.Request) msg;
if (req.getOpCode() == BookieProtocol.ADDENTRY) {
ctx.channel().writeAndFlush(
- new BookieProtocol.AddResponse(
+ BookieProtocol.AddResponse.create(
req.getProtocolVersion(),
BookieProtocol.EUA,
req.getLedgerId(), req.getEntryId()));
} else if (req.getOpCode() == BookieProtocol.READENTRY) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 0fece29..df7f3b2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -102,7 +102,9 @@ public class BookieProtoEncoding {
ByteBuf buf = allocator.buffer(totalHeaderSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0,
BookieProtocol.MASTER_KEY_LENGTH);
- return DoubleByteBuf.get(buf, ar.getData());
+ ByteBuf data = ar.getData();
+ ar.recycle();
+ return DoubleByteBuf.get(buf, data);
} else if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
@@ -155,7 +157,9 @@ public class BookieProtoEncoding {
// Read ledger and entry id without advancing the reader index
ledgerId = packet.getLong(packet.readerIndex());
entryId = packet.getLong(packet.readerIndex() + 8);
- return new BookieProtocol.AddRequest(version, ledgerId,
entryId, flags, masterKey, packet.retain());
+ return BookieProtocol.AddRequest.create(
+ version, ledgerId, entryId, flags,
+ masterKey, packet.retain());
}
case BookieProtocol.READENTRY:
@@ -223,29 +227,33 @@ public class BookieProtoEncoding {
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
ServerStats.getInstance().incrementPacketsSent();
- if (msg instanceof BookieProtocol.ReadResponse) {
- buf.writeInt(r.getErrorCode());
- buf.writeLong(r.getLedgerId());
- buf.writeLong(r.getEntryId());
+ try {
+ if (msg instanceof BookieProtocol.ReadResponse) {
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+
+ BookieProtocol.ReadResponse rr =
(BookieProtocol.ReadResponse) r;
+ if (rr.hasData()) {
+ return DoubleByteBuf.get(buf, rr.getData());
+ } else {
+ return buf;
+ }
+ } else if (msg instanceof BookieProtocol.AddResponse) {
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
- BookieProtocol.ReadResponse rr =
(BookieProtocol.ReadResponse)r;
- if (rr.hasData()) {
- return DoubleByteBuf.get(buf, rr.getData());
- } else {
return buf;
+ } else if (msg instanceof BookieProtocol.AuthResponse) {
+ BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthResponse) r).getAuthMessage();
+ return DoubleByteBuf.get(buf,
Unpooled.wrappedBuffer(am.toByteArray()));
+ } else {
+ LOG.error("Cannot encode unknown response type {}",
msg.getClass().getName());
+ return msg;
}
- } else if (msg instanceof BookieProtocol.AddResponse) {
- buf.writeInt(r.getErrorCode());
- buf.writeLong(r.getLedgerId());
- buf.writeLong(r.getEntryId());
-
- return buf;
- } else if (msg instanceof BookieProtocol.AuthResponse) {
- BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthResponse)r).getAuthMessage();
- return DoubleByteBuf.get(buf,
Unpooled.wrappedBuffer(am.toByteArray()));
- } else {
- LOG.error("Cannot encode unknown response type {}",
msg.getClass().getName());
- return msg;
+ } finally {
+ r.recycle();
}
}
@Override
@@ -263,7 +271,7 @@ public class BookieProtoEncoding {
rc = buffer.readInt();
ledgerId = buffer.readLong();
entryId = buffer.readLong();
- return new BookieProtocol.AddResponse(version, rc, ledgerId,
entryId);
+ return BookieProtocol.AddResponse.create(version, rc,
ledgerId, entryId);
case BookieProtocol.READENTRY:
rc = buffer.readInt();
ledgerId = buffer.readLong();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 6fc91e5..2c7a828 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -23,6 +23,9 @@ package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
@@ -40,7 +43,7 @@ public interface BookieProtocol {
public static final byte LOWEST_COMPAT_PROTOCOL_VERSION = 0;
/**
- * Current version of the protocol, which client will use.
+ * Current version of the protocol, which client will use.
*/
public static final byte CURRENT_PROTOCOL_VERSION = 2;
@@ -62,19 +65,19 @@ public interface BookieProtocol {
*/
public static final int MASTER_KEY_LENGTH = 20;
- /**
+ /**
* The first int of a packet is the header.
* It contains the version, opCode and flags.
* The initial versions of BK didn't have this structure
- * and just had an int representing the opCode as the
- * first int. This handles that case also.
+ * and just had an int representing the opCode as the
+ * first int. This handles that case also.
*/
final static class PacketHeader {
public static int toInt(byte version, byte opCode, short flags) {
if (version == 0) {
return (int)opCode;
} else {
- return ((version & 0xFF) << 24)
+ return ((version & 0xFF) << 24)
| ((opCode & 0xFF) << 16)
| (flags & 0xFFFF);
}
@@ -177,20 +180,14 @@ public interface BookieProtocol {
public static final short FLAG_RECOVERY_ADD = 0x0002;
static class Request {
-
- final byte protocolVersion;
- final byte opCode;
- final long ledgerId;
- final long entryId;
- final short flags;
- final byte[] masterKey;
-
- protected Request(byte protocolVersion, byte opCode, long ledgerId,
- long entryId, short flags) {
- this(protocolVersion, opCode, ledgerId, entryId, flags, null);
- }
-
- protected Request(byte protocolVersion, byte opCode, long ledgerId,
+ byte protocolVersion;
+ byte opCode;
+ long ledgerId;
+ long entryId;
+ short flags;
+ byte[] masterKey;
+
+ protected void init(byte protocolVersion, byte opCode, long ledgerId,
long entryId, short flags, byte[] masterKey) {
this.protocolVersion = protocolVersion;
this.opCode = opCode;
@@ -233,15 +230,25 @@ public interface BookieProtocol {
public String toString() {
return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode,
ledgerId, entryId);
}
+
+ public void recycle() {}
}
static class AddRequest extends Request {
- final ByteBuf data;
-
- public AddRequest(byte protocolVersion, long ledgerId, long entryId,
- short flags, byte[] masterKey, ByteBuf data) {
- super(protocolVersion, ADDENTRY, ledgerId, entryId, flags,
masterKey);
- this.data = data.retain();
+ ByteBuf data;
+
+ static AddRequest create(byte protocolVersion, long ledgerId,
+ long entryId, short flags, byte[] masterKey,
+ ByteBuf data) {
+ AddRequest add = RECYCLER.get();
+ add.protocolVersion = protocolVersion;
+ add.opCode = ADDENTRY;
+ add.ledgerId = ledgerId;
+ add.entryId = entryId;
+ add.flags = flags;
+ add.masterKey = masterKey;
+ add.data = data.retain();
+ return add;
}
ByteBuf getData() {
@@ -255,16 +262,36 @@ public interface BookieProtocol {
void release() {
data.release();
}
+
+ private final Handle<AddRequest> recyclerHandle;
+ private AddRequest(Handle<AddRequest> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<AddRequest> RECYCLER = new
Recycler<AddRequest>() {
+ protected AddRequest newObject(Handle<AddRequest> handle) {
+ return new AddRequest(handle);
+ }
+ };
+
+ @Override
+ public void recycle() {
+ ledgerId = -1;
+ entryId = -1;
+ masterKey = null;
+ data = null;
+ recyclerHandle.recycle(this);
+ }
}
static class ReadRequest extends Request {
ReadRequest(byte protocolVersion, long ledgerId, long entryId, short
flags) {
- super(protocolVersion, READENTRY, ledgerId, entryId, flags);
+ init(protocolVersion, READENTRY, ledgerId, entryId, flags, null);
}
ReadRequest(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
- super(protocolVersion, READENTRY, ledgerId, entryId, flags,
masterKey);
+ init(protocolVersion, READENTRY, ledgerId, entryId, flags,
masterKey);
}
boolean isFencingRequest() {
@@ -276,7 +303,7 @@ public interface BookieProtocol {
final AuthMessage authMessage;
AuthRequest(byte protocolVersion, AuthMessage authMessage) {
- super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
+ init(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
this.authMessage = authMessage;
}
@@ -285,14 +312,14 @@ public interface BookieProtocol {
}
}
- static class Response {
- final byte protocolVersion;
- final byte opCode;
- final int errorCode;
- final long ledgerId;
- final long entryId;
+ static abstract class Response {
+ byte protocolVersion;
+ byte opCode;
+ int errorCode;
+ long ledgerId;
+ long entryId;
- protected Response(byte protocolVersion, byte opCode,
+ protected void init(byte protocolVersion, byte opCode,
int errorCode, long ledgerId, long entryId) {
this.protocolVersion = protocolVersion;
this.opCode = opCode;
@@ -326,18 +353,20 @@ public interface BookieProtocol {
return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]",
opCode, ledgerId, entryId, errorCode);
}
+
+ abstract void recycle();
}
static class ReadResponse extends Response {
final ByteBuf data;
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long
entryId) {
- super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+ init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
this.data = Unpooled.EMPTY_BUFFER;
}
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long
entryId, ByteBuf data) {
- super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
+ init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
this.data = data;
}
@@ -348,18 +377,41 @@ public interface BookieProtocol {
ByteBuf getData() {
return data;
}
+
+ void recycle() {
+ }
}
static class AddResponse extends Response {
- AddResponse(byte protocolVersion, int errorCode, long ledgerId, long
entryId) {
- super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
+ static AddResponse create(byte protocolVersion, int errorCode, long
ledgerId, long entryId) {
+ AddResponse response = RECYCLER.get();
+ response.init(protocolVersion, ADDENTRY, errorCode, ledgerId,
entryId);
+ return response;
+ }
+
+ private final Handle<AddResponse> recyclerHandle;
+ private AddResponse(Handle<AddResponse> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<AddResponse> RECYCLER = new
Recycler<AddResponse>() {
+ protected AddResponse newObject(Handle<AddResponse> handle) {
+ return new AddResponse(handle);
+ }
+ };
+
+ public void recycle() {
+ recyclerHandle.recycle(this);
}
}
-
+
static class ErrorResponse extends Response {
ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
long ledgerId, long entryId) {
- super(protocolVersion, opCode, errorCode, ledgerId, entryId);
+ init(protocolVersion, opCode, errorCode, ledgerId, entryId);
+ }
+
+ void recycle() {
}
}
@@ -367,13 +419,16 @@ public interface BookieProtocol {
final AuthMessage authMessage;
AuthResponse(byte protocolVersion, AuthMessage authMessage) {
- super(protocolVersion, AUTH, EOK, -1, -1);
+ init(protocolVersion, AUTH, EOK, -1, -1);
this.authMessage = authMessage;
}
AuthMessage getAuthMessage() {
return authMessage;
}
+
+ void recycle() {
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index ffa030b..0c8f25a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -2,6 +2,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
+
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
@@ -532,11 +533,13 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
CompletionKey completionKey = null;
if (useV2WireProtocol) {
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
- request = new
BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
entryId,
+ request = BookieProtocol.AddRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId,
OperationType.ADD_ENTRY);
+
// Build the request and calculate the total size to be included
in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
@@ -854,6 +857,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
return request.toString();
}
}
+
void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
@@ -1010,6 +1014,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
public void safeRun() {
completionValue.handleV2Response(ledgerId, entryId,
status, response);
+ response.recycle();
}
});
}
@@ -1734,7 +1739,6 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
public void release() {}
}
-
/**
* Note : Helper functions follow
*/
@@ -1814,7 +1818,8 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
V2CompletionKey that = (V2CompletionKey) object;
return this.entryId == that.entryId
- && this.ledgerId == that.ledgerId;
+ && this.ledgerId == that.ledgerId
+ && this.operationType == that.operationType;
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index c0be162..342acd5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -25,7 +25,7 @@ import io.netty.buffer.ByteBuf;
class ResponseBuilder {
static BookieProtocol.Response buildErrorResponse(int errorCode,
BookieProtocol.Request r) {
if (r.getOpCode() == BookieProtocol.ADDENTRY) {
- return new BookieProtocol.AddResponse(r.getProtocolVersion(),
errorCode,
+ return BookieProtocol.AddResponse.create(r.getProtocolVersion(),
errorCode,
r.getLedgerId(),
r.getEntryId());
} else {
assert(r.getOpCode() == BookieProtocol.READENTRY);
@@ -35,7 +35,7 @@ class ResponseBuilder {
}
static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) {
- return new BookieProtocol.AddResponse(r.getProtocolVersion(),
BookieProtocol.EOK, r.getLedgerId(),
+ return BookieProtocol.AddResponse.create(r.getProtocolVersion(),
BookieProtocol.EOK, r.getLedgerId(),
r.getEntryId());
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index c4b2840..416a478 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
@@ -70,11 +71,12 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
startTimeNanos = MathUtils.nowInNano();
int rc = BookieProtocol.EOK;
+ ByteBuf addData = add.getData();
try {
if (add.isRecoveryAdd()) {
- requestProcessor.bookie.recoveryAddEntry(add.getData(), this,
channel, add.getMasterKey());
+ requestProcessor.bookie.recoveryAddEntry(addData, this,
channel, add.getMasterKey());
} else {
- requestProcessor.bookie.addEntry(add.getData(), this, channel,
add.getMasterKey());
+ requestProcessor.bookie.addEntry(addData, this, channel,
add.getMasterKey());
}
} catch (IOException e) {
LOG.error("Error writing " + add, e);
@@ -86,7 +88,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
rc = BookieProtocol.EUA;
} finally {
- add.release();
+ addData.release();
}
if (rc != BookieProtocol.EOK) {
@@ -95,6 +97,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
sendResponse(rc,
ResponseBuilder.buildErrorResponse(rc, add),
requestProcessor.addRequestStats);
+ add.recycle();
}
}
@@ -111,6 +114,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
sendResponse(rc,
ResponseBuilder.buildAddResponse(request),
requestProcessor.addRequestStats);
+ request.recycle();
recycle();
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].