This is an automated email from the ASF dual-hosted git repository. ivank pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 8d048ab Refactored ReadResponse ref count handling 8d048ab is described below commit 8d048abce486c63d428041f77ee9a506756f4d1e Author: Ivan Kelly <iv...@apache.org> AuthorDate: Fri Mar 2 10:08:48 2018 +0100 Refactored ReadResponse ref count handling This contains a number of changes. - V2 bookie protocol - Add retain and release methods to all responses. For read response it handles the data buffer. - ReadResponses always have a buffer now, even if empty. - Server side - In the v2 read handler, releasing of the buffer in the case of error is left to the very end. - Client side - Per channel bookie clients own the buffer for read responses. If a ReadCallback want it to live past the lifetime of the call it must call retain. This change was originally e8643140 in the yahoo-4.3 branch. Author: Ivan Kelly <iv...@apache.org> Reviewers: Matteo Merli <mme...@apache.org>, Sijie Guo <si...@apache.org> This closes #1221 from ivankelly/yahoo-bp-11 --- .../apache/bookkeeper/client/PendingReadOp.java | 5 ++-- .../client/ReadLastConfirmedAndEntryOp.java | 4 +++- .../bookkeeper/client/ReadLastConfirmedOp.java | 3 --- .../bookkeeper/proto/BookieProtoEncoding.java | 8 ++----- .../apache/bookkeeper/proto/BookieProtocol.java | 28 ++++++++++++++-------- .../bookkeeper/proto/PerChannelBookieClient.java | 15 ++++-------- .../bookkeeper/proto/ReadEntryProcessor.java | 9 ------- 7 files changed, 31 insertions(+), 41 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 7f2860b..d596eee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -139,7 +139,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { } catch (BKDigestMatchException e) { readOpDmCounter.inc(); logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); - buffer.release(); return false; } @@ -154,7 +153,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { writeSet.recycle(); return true; } else { - buffer.release(); return false; } } @@ -589,12 +587,15 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { heardFromHosts.add(rctx.to); heardFromHostsBitSet.set(rctx.bookieIndex, true); + buffer.retain(); if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { if (!isRecoveryRead) { // do not advance LastAddConfirmed for recovery reads lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); } submitCallback(BKException.Code.OK); + } else { + buffer.release(); } if (numPendingEntries < 0) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java index a8bc84f..5327639 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java @@ -121,7 +121,6 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt content = lh.macManager.verifyDigestAndReturnData(entryId, buffer); } catch (BKException.BKDigestMatchException e) { logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); - buffer.release(); return false; } @@ -555,6 +554,7 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt hasValidResponse = true; if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) { + buffer.retain(); if (request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) { // callback immediately if (rCtx.getLacUpdateTimestamp().isPresent()) { @@ -568,6 +568,8 @@ class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEnt submitCallback(BKException.Code.OK); requestComplete.set(true); heardFromHostsBitSet.set(rCtx.getBookieIndex(), true); + } else { + buffer.release(); } } else { emptyResponsesFromHostsBitSet.set(rCtx.getBookieIndex(), true); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java index 32d7ffe..2cb6152 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java @@ -18,7 +18,6 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.proto.BookieProtocol; @@ -101,8 +100,6 @@ class ReadLastConfirmedOp implements ReadEntryCallback { } } - ReferenceCountUtil.release(buffer); - if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) { // this still counts as a valid response, e.g., if the client crashed without writing any entry heardValidResponse = true; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 4594ef1..a5b2141 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -292,12 +292,8 @@ public class BookieProtoEncoding { ledgerId = buffer.readLong(); entryId = buffer.readLong(); - if (rc == BookieProtocol.EOK) { - return new BookieProtocol.ReadResponse(version, rc, - ledgerId, entryId, buffer.retainedSlice()); - } else { - return new BookieProtocol.ReadResponse(version, rc, ledgerId, entryId); - } + return new BookieProtocol.ReadResponse( + version, rc, ledgerId, entryId, buffer.retainedSlice()); case BookieProtocol.AUTH: ByteBufInputStream bufStream = new ByteBufInputStream(buffer); BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 9ae6316..86c93e1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -428,7 +428,14 @@ public interface BookieProtocol { opCode, ledgerId, entryId, errorCode); } - abstract void recycle(); + void retain() { + } + + void release() { + } + + void recycle() { + } } /** @@ -438,8 +445,7 @@ public interface BookieProtocol { final ByteBuf data; ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { - init(protocolVersion, READENTRY, errorCode, ledgerId, entryId); - this.data = Unpooled.EMPTY_BUFFER; + this(protocolVersion, errorCode, ledgerId, entryId, Unpooled.EMPTY_BUFFER); } ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) { @@ -455,7 +461,14 @@ public interface BookieProtocol { return data; } - void recycle() { + @Override + public void retain() { + data.retain(); + } + + @Override + public void release() { + data.release(); } } @@ -480,6 +493,7 @@ public interface BookieProtocol { } }; + @Override public void recycle() { recyclerHandle.recycle(this); } @@ -493,9 +507,6 @@ public interface BookieProtocol { long ledgerId, long entryId) { init(protocolVersion, opCode, errorCode, ledgerId, entryId); } - - void recycle() { - } } /** @@ -512,9 +523,6 @@ public interface BookieProtocol { AuthMessage getAuthMessage() { return authMessage; } - - void recycle() { - } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 7735b98..5adc7cf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1105,6 +1105,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + operationType + " and ledger:entry : " + response.ledgerId + ":" + response.entryId); } + response.release(); } else { long orderingKey = completionValue.ledgerId; executor.submitOrdered(orderingKey, @@ -1582,11 +1583,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { return; } BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response; - ByteBuf data = null; - if (readResponse.hasData()) { - data = readResponse.getData(); - } - handleReadResponse(ledgerId, entryId, status, data, + handleReadResponse(ledgerId, entryId, status, readResponse.getData(), INVALID_ENTRY_ID, -1L); } @@ -1611,6 +1608,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp); + buffer.release(); // meaningless using unpooled, but client may expect to hold the last reference } private void handleReadResponse(long ledgerId, @@ -1619,23 +1617,20 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { ByteBuf buffer, long maxLAC, // max known lac piggy-back from bookies long lacUpdateTimestamp) { // the timestamp when the lac is updated. - int readableBytes = buffer == null ? 0 : buffer.readableBytes(); + int readableBytes = buffer.readableBytes(); int rc = logAndConvertStatus(status, BKException.Code.ReadException, "ledger", ledgerId, "entry", entryId, "entryLength", readableBytes); - if (buffer != null) { - buffer = buffer.slice(); - } if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof ReadEntryCallbackCtx)) { ((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC); } if (lacUpdateTimestamp > -1L && (ctx instanceof ReadLastConfirmedAndEntryContext)) { ((ReadLastConfirmedAndEntryContext) ctx).setLacUpdateTimestamp(lacUpdateTimestamp); } - cb.readEntryComplete(rc, ledgerId, entryId, buffer, ctx); + cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 2d4c540..edeb8a6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -84,8 +84,6 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { if (null == fenced || !fenced) { // if failed to fence, fail the read request to make it retry. errorCode = BookieProtocol.EIO; - data.release(); - data = null; } else { errorCode = BookieProtocol.EOK; } @@ -93,18 +91,12 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { Thread.currentThread().interrupt(); LOG.error("Interrupting fence read entry {}", request, ie); errorCode = BookieProtocol.EIO; - data.release(); - data = null; } catch (ExecutionException ee) { LOG.error("Failed to fence read entry {}", request, ee); errorCode = BookieProtocol.EIO; - data.release(); - data = null; } catch (TimeoutException te) { LOG.error("Timeout to fence read entry {}", request, te); errorCode = BookieProtocol.EIO; - data.release(); - data = null; } } else { errorCode = BookieProtocol.EOK; @@ -141,7 +133,6 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { TimeUnit.NANOSECONDS); sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, request), requestProcessor.readRequestStats); - } else { ReferenceCountUtil.release(data); -- To stop receiving notification emails like this one, please contact iv...@apache.org.