merlimat commented on a change in pull request #10755:
URL: https://github.com/apache/pulsar/pull/10755#discussion_r643271542
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -178,8 +178,21 @@ public void addComplete(int rc, final LedgerHandle lh,
long entryId, Object ctx)
@Override
public void safeRun() {
// Remove this entry from the head of the pending queue
- OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
- checkArgument(this == firstInQueue);
Review comment:
I'd leave this change out of this PR. This is a "logic" check. If this
fails it means we really have a code bug issue and we cannot make many
assumptions on how to recover from it.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
// `data` will be released in `closeComplete`
ledger.asyncClose(this, ctx);
} else {
- updateLatency();
- AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
- if (cb != null) {
- cb.addComplete(lastEntry, data.asReadOnly(), ctx);
- ml.notifyCursors();
- ml.notifyWaitingEntryCallBacks();
- ReferenceCountUtil.release(data);
- this.recycle();
- } else {
+ ByteBuf data = this.data;
+ try {
Review comment:
The change here LGTM, although I'd prefer to get it in a separate PR.
Also, do you have a stack trace on why the exception would have been triggered
here?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1504,6 +1504,8 @@ public synchronized void updateLedgersIdsComplete(Stat
stat) {
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.create(existsOp.ml, existsOp.data,
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+ // release the extra retain
+ ReferenceCountUtil.release(existsOp.data);
Review comment:
👍 I think this was introduced in #5942.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
// `data` will be released in `closeComplete`
ledger.asyncClose(this, ctx);
} else {
- updateLatency();
- AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
- if (cb != null) {
- cb.addComplete(lastEntry, data.asReadOnly(), ctx);
- ml.notifyCursors();
- ml.notifyWaitingEntryCallBacks();
- ReferenceCountUtil.release(data);
- this.recycle();
- } else {
+ ByteBuf data = this.data;
+ try {
+ updateLatency();
+ AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+ if (cb != null) {
+ cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+ ml.notifyCursors();
+ ml.notifyWaitingEntryCallBacks();
+ this.recycle();
+ }
+ } finally {
ReferenceCountUtil.release(data);
}
}
}
@Override
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
- checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match
with acked ledgerId %s", ledger.getId(),
- lh.getId());
-
- if (rc == BKException.Code.OK) {
- log.debug("Successfully closed ledger {}", lh.getId());
- } else {
- log.warn("Error when closing ledger {}. Status={}", lh.getId(),
BKException.getMessage(rc));
- }
+ ByteBuf data = this.data;
+ try {
+ checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't
match with acked ledgerId %s",
Review comment:
Same comment for the other assertion above. This is really a code-sanity
check. If this fails, it means things are **very** broken.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]