merlimat commented on code in PR #3848:
URL: https://github.com/apache/bookkeeper/pull/3848#discussion_r1130564777
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -503,17 +513,26 @@ public void run() {
// responses
for (int i = 0; i < requestsCount; i++) {
ForceWriteRequest req = localRequests.get(i);
+ req.getForceWriteWaiters().forEach(ele -> {
Review Comment:
`req.process()` is already iterating over all the entries for each
`ForceWriteRequest`. It would be better to reuse the same loop.
Additionally using `forEach` has 2 problems:
1. allocates an iterator, unlike doing `for (int i =0; i... .)`
2. the lambda is also capturing variables and thus it will allocate a
temporary object
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -495,6 +504,7 @@ public void run() {
journalStats.getForceWriteQueueSize().addCount(-requestsCount);
+ Set<BookieRequestHandler> writeHandlers = new HashSet<>();
Review Comment:
Using the set has its own overhead too. In this case it will allocate a
`Node` object for the linked list for each entry added here.
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java:
##########
@@ -495,6 +504,7 @@ public void run() {
journalStats.getForceWriteQueueSize().addCount(-requestsCount);
+ Set<BookieRequestHandler> writeHandlers = new HashSet<>();
Review Comment:
Perhaps we could use `ObjectHashSet` from carrotsearch
https://github.com/carrotsearch/hppc
It's already used in Pulsar too.
We should also reuse the same set each time.
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java:
##########
@@ -92,31 +92,22 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception
public synchronized void prepareSendResponseV2(int rc,
BookieProtocol.ParsedAddRequest req) {
if (pendingSendResponses == null) {
- pendingSendResponses =
ctx.alloc().directBuffer(maxPendingResponsesSize != 0
- ? maxPendingResponsesSize : 256);
+ pendingSendResponses =
ctx().alloc().directBuffer(maxPendingResponsesSize);
}
-
BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req,
pendingSendResponses);
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
- if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) {
- synchronized (this) {
- if (pendingSendResponses != null) {
- maxPendingResponsesSize = Math.max(maxPendingResponsesSize,
- pendingSendResponses.readableBytes());
- if (ctx.channel().isActive()) {
- ctx.writeAndFlush(pendingSendResponses,
ctx.voidPromise());
- } else {
- pendingSendResponses.release();
- }
-
- pendingSendResponses = null;
- }
+ public synchronized void flushPendingResponse() {
+ if (pendingSendResponses != null) {
+ maxPendingResponsesSize = (int) Math.max(
+ maxPendingResponsesSize * 0.9 + 0.1 *
pendingSendResponses.readableBytes(),
Review Comment:
This looks like it's going to take quite a bit of iterations to expand the
cluster size. While in the meantime we have to keep expanding the `ByteBuf`,
which will incur in multiple allocations and memory copies
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]