eolivelli commented on code in PR #17228:
URL: https://github.com/apache/pulsar/pull/17228#discussion_r964816425
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2357,6 +2357,41 @@ private void
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
}
}
+ // Although we have caught the connection loss exception on the meta
store, to avoid other exceptions cause
+ // the mismatch between meta store and in memory, we refresh the ledger
info list when the offload execute
+ // failed by badversion
+ private void asyncRefreshLedgersInfoOnBadVersion(ManagedLedgerException
exception) {
+ if (!(exception instanceof BadVersionException)) {
+ return;
+ }
+ if (!metadataMutex.tryLock()) {
+ scheduledExecutor.schedule(
+ () -> asyncRefreshLedgersInfoOnBadVersion(exception), 100,
TimeUnit.MILLISECONDS);
+ return;
+ }
+ store.getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat)
{
+ ledgersStat = stat;
+ synchronized (this) {
Review Comment:
what about an explicit ?
`synchronized (ManagedLedgerImpl.this) {`
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2357,6 +2357,41 @@ private void
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
}
}
+ // Although we have caught the connection loss exception on the meta
store, to avoid other exceptions cause
+ // the mismatch between meta store and in memory, we refresh the ledger
info list when the offload execute
+ // failed by badversion
+ private void asyncRefreshLedgersInfoOnBadVersion(ManagedLedgerException
exception) {
+ if (!(exception instanceof BadVersionException)) {
+ return;
+ }
+ if (!metadataMutex.tryLock()) {
+ scheduledExecutor.schedule(
+ () -> asyncRefreshLedgersInfoOnBadVersion(exception), 100,
TimeUnit.MILLISECONDS);
+ return;
+ }
+ store.getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat)
{
+ ledgersStat = stat;
+ synchronized (this) {
+ for (LedgerInfo li : mlInfo.getLedgerInfoList()) {
+ long ledgerId = li.getLedgerId();
+ if (!li.equals(ledgers.get(ledgerId))) {
+ ledgers.put(ledgerId, li);
+ }
+ }
+ }
+ metadataMutex.unlock();
Review Comment:
this should be in a `finally` block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]