This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 767e455 Recycle instances of WriteEntryProcessor and
ReadEntryProcessor
767e455 is described below
commit 767e4556b7f5cc523b127e7987cc442f707699d9
Author: Aaron Gresch <[email protected]>
AuthorDate: Fri Oct 27 11:51:32 2017 -0700
Recycle instances of WriteEntryProcessor and ReadEntryProcessor
merge of some changes from Yahoo's repo
Author: Aaron Gresch <[email protected]>
Reviewers: Sijie Guo <[email protected]>, Matteo Merli <[email protected]>
This closes #669 from agresch/agresch_recycle_entryprocessor
---
.../bookkeeper/proto/BookieRequestProcessor.java | 4 +--
.../bookkeeper/proto/PacketProcessorBase.java | 17 +++++++----
.../bookkeeper/proto/ReadEntryProcessor.java | 27 ++++++++++++++++--
.../bookkeeper/proto/WriteEntryProcessor.java | 33 ++++++++++++++++++++--
4 files changed, 70 insertions(+), 11 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 8d719e6..81208f3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -374,7 +374,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
}
private void processAddRequest(final BookieProtocol.Request r, final
Channel c) {
- WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
+ WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
if (null == writeThreadPool) {
write.run();
} else {
@@ -383,7 +383,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
}
private void processReadRequest(final BookieProtocol.Request r, final
Channel c) {
- ReadEntryProcessor read = new ReadEntryProcessor(r, c, this);
+ ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
if (null == readThreadPool) {
read.run();
} else {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 4f14dcf..5c9d8d2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -30,18 +30,25 @@ import io.netty.channel.Channel;
abstract class PacketProcessorBase extends SafeRunnable {
private final static Logger logger =
LoggerFactory.getLogger(PacketProcessorBase.class);
- final Request request;
- final Channel channel;
- final BookieRequestProcessor requestProcessor;
- final long enqueueNanos;
+ Request request;
+ Channel channel;
+ BookieRequestProcessor requestProcessor;
+ long enqueueNanos;
- PacketProcessorBase(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
+ protected void init(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
this.request = request;
this.channel = channel;
this.requestProcessor = requestProcessor;
this.enqueueNanos = MathUtils.nowInNano();
}
+ protected void reset() {
+ request = null;
+ channel = null;
+ requestProcessor = null;
+ enqueueNanos = -1;
+ }
+
protected boolean isVersionCompatible() {
byte version = request.getProtocolVersion();
if (version < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index f65455c..f651e13 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -19,7 +19,9 @@ package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.Recycler.Handle;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@@ -37,9 +39,11 @@ import org.slf4j.LoggerFactory;
class ReadEntryProcessor extends PacketProcessorBase {
private final static Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessor.class);
- public ReadEntryProcessor(Request request, Channel channel,
+ public static ReadEntryProcessor create(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ ReadEntryProcessor rep = RECYCLER.get();
+ rep.init(request, channel, requestProcessor);
+ return rep;
}
@Override
@@ -144,10 +148,29 @@ class ReadEntryProcessor extends PacketProcessorBase {
sendResponse(errorCode,
ResponseBuilder.buildErrorResponse(errorCode, read),
requestProcessor.readRequestStats);
}
+ recycle();
}
@Override
public String toString() {
return String.format("ReadEntry(%d, %d)", request.getLedgerId(),
request.getEntryId());
}
+
+ private void recycle() {
+ super.reset();
+ this.recyclerHandle.recycle(this);
+ }
+
+ private final Recycler.Handle<ReadEntryProcessor> recyclerHandle;
+
+ private ReadEntryProcessor(Recycler.Handle<ReadEntryProcessor>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<ReadEntryProcessor> RECYCLER = new
Recycler<ReadEntryProcessor>() {
+ @Override
+ protected ReadEntryProcessor
newObject(Recycler.Handle<ReadEntryProcessor> handle) {
+ return new ReadEntryProcessor(handle);
+ }
+ };
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 827aed9..c4b2840 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -18,6 +18,8 @@
package org.apache.bookkeeper.proto;
import io.netty.channel.Channel;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -39,9 +41,16 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
long startTimeNanos;
- public WriteEntryProcessor(Request request, Channel channel,
+ protected void reset() {
+ super.reset();
+ startTimeNanos = -1L;
+ }
+
+ public static WriteEntryProcessor create(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
- super(request, channel, requestProcessor);
+ WriteEntryProcessor wep = RECYCLER.get();
+ wep.init(request, channel, requestProcessor);
+ return wep;
}
@Override
@@ -55,6 +64,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
sendResponse(BookieProtocol.EREADONLY,
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add),
requestProcessor.addRequestStats);
+ add.release();
return;
}
@@ -101,6 +111,7 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
sendResponse(rc,
ResponseBuilder.buildAddResponse(request),
requestProcessor.addRequestStats);
+ recycle();
}
@Override
@@ -108,4 +119,22 @@ class WriteEntryProcessor extends PacketProcessorBase
implements WriteCallback {
return String.format("WriteEntry(%d, %d)",
request.getLedgerId(), request.getEntryId());
}
+
+ private void recycle() {
+ reset();
+ recyclerHandle.recycle(this);
+ }
+
+ private final Recycler.Handle<WriteEntryProcessor> recyclerHandle;
+
+ private WriteEntryProcessor(Recycler.Handle<WriteEntryProcessor>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<WriteEntryProcessor> RECYCLER = new
Recycler<WriteEntryProcessor>() {
+ @Override
+ protected WriteEntryProcessor
newObject(Recycler.Handle<WriteEntryProcessor> handle) {
+ return new WriteEntryProcessor(handle);
+ }
+ };
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].