eolivelli commented on code in PR #17228:
URL: https://github.com/apache/pulsar/pull/17228#discussion_r964816425


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2357,6 +2357,41 @@ private void 
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
         }
     }
 
+    // Although we have caught the connection loss exception on the meta 
store, to avoid other exceptions cause
+    // the mismatch between meta store and in memory, we refresh the ledger 
info list when the offload execute
+    // failed by badversion
+    private void asyncRefreshLedgersInfoOnBadVersion(ManagedLedgerException 
exception) {
+        if (!(exception instanceof BadVersionException)) {
+            return;
+        }
+        if (!metadataMutex.tryLock()) {
+            scheduledExecutor.schedule(
+                () -> asyncRefreshLedgersInfoOnBadVersion(exception), 100, 
TimeUnit.MILLISECONDS);
+            return;
+        }
+        store.getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) 
{
+                ledgersStat = stat;
+                synchronized (this) {

Review Comment:
   what about an explicit  ?
   `synchronized (ManagedLedgerImpl.this) {`
   
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2357,6 +2357,41 @@ private void 
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
         }
     }
 
+    // Although we have caught the connection loss exception on the meta 
store, to avoid other exceptions cause
+    // the mismatch between meta store and in memory, we refresh the ledger 
info list when the offload execute
+    // failed by badversion
+    private void asyncRefreshLedgersInfoOnBadVersion(ManagedLedgerException 
exception) {
+        if (!(exception instanceof BadVersionException)) {
+            return;
+        }
+        if (!metadataMutex.tryLock()) {
+            scheduledExecutor.schedule(
+                () -> asyncRefreshLedgersInfoOnBadVersion(exception), 100, 
TimeUnit.MILLISECONDS);
+            return;
+        }
+        store.getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) 
{
+                ledgersStat = stat;
+                synchronized (this) {
+                    for (LedgerInfo li : mlInfo.getLedgerInfoList()) {
+                        long ledgerId = li.getLedgerId();
+                        if (!li.equals(ledgers.get(ledgerId))) {
+                            ledgers.put(ledgerId, li);
+                        }
+                    }
+                }
+                metadataMutex.unlock();

Review Comment:
   this should be in a `finally` block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to