sijie closed pull request #1961: Remove potential deadlock in offloading
URL: https://github.com/apache/incubator-pulsar/pull/1961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 68bc801a53..65276b3f2d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2053,11 +2053,10 @@ private void 
offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
                             }
                             Optional<Throwable> errorToReport = firstError;
                             synchronized (ManagedLedgerImpl.this) {
-                                ledgersListMutex.lock();
+                                // if the ledger doesn't exist anymore, ignore 
the error
                                 if (ledgers.containsKey(ledgerId)) {
                                     errorToReport = 
Optional.of(firstError.orElse(exception));
                                 }
-                                ledgersListMutex.unlock();
                             }
 
                             offloadLoop(promise, ledgersToOffload,
@@ -2086,43 +2085,60 @@ private void 
offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
     }
 
     private CompletableFuture<Void> transformLedgerInfo(long ledgerId, 
LedgerInfoTransformation transformation) {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
-        synchronized (this) {
-            ledgersListMutex.lock();
+        CompletableFuture<Void> promise = new CompletableFuture<Void>();
 
-            promise.whenComplete((ignore, exception) -> {
-                    ledgersListMutex.unlock();
-                });
+        tryTransformLedgerInfo(ledgerId, transformation, promise);
 
-            LedgerInfo oldInfo = ledgers.get(ledgerId);
-            if (oldInfo == null) {
-                promise.completeExceptionally(
-                        new OffloadConflict(
-                                "Ledger " + ledgerId + " no longer exists in 
ManagedLedger, likely trimmed"));
-            } else {
-                try {
-                    LedgerInfo newInfo = transformation.transform(oldInfo);
-                    ledgers.put(ledgerId, newInfo);
-                    store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat,
-                            new MetaStoreCallback<Void>() {
-                                @Override
-                                public void operationComplete(Void result, 
Stat stat) {
-                                    ledgersStat = stat;
-                                    promise.complete(null);
-                                }
+        return promise;
+    }
 
-                                @Override
-                                public void operationFailed(MetaStoreException 
e) {
-                                    promise.completeExceptionally(e);
-                                }
-                            });
-                } catch (ManagedLedgerException mle) {
-                    promise.completeExceptionally(mle);
+    private void tryTransformLedgerInfo(long ledgerId, 
LedgerInfoTransformation transformation,
+                                        CompletableFuture<Void> finalPromise) {
+        synchronized (this) {
+            if (!ledgersListMutex.tryLock()) {
+                // retry in 100 milliseconds
+                scheduledExecutor.schedule(safeRun(() -> 
tryTransformLedgerInfo(ledgerId, transformation,
+                                                                               
 finalPromise)),
+                                           100, TimeUnit.MILLISECONDS);
+            } else { // lock acquired
+                CompletableFuture<Void> unlockingPromise = new 
CompletableFuture<>();
+                unlockingPromise.whenComplete((res, ex) -> {
+                        ledgersListMutex.unlock();
+                        if (ex != null) {
+                            finalPromise.completeExceptionally(ex);
+                        } else {
+                            finalPromise.complete(res);
+                        }
+                    });
+
+                LedgerInfo oldInfo = ledgers.get(ledgerId);
+                if (oldInfo == null) {
+                    unlockingPromise.completeExceptionally(
+                            new OffloadConflict(
+                                    "Ledger " + ledgerId + " no longer exists 
in ManagedLedger, likely trimmed"));
+                } else {
+                    try {
+                        LedgerInfo newInfo = transformation.transform(oldInfo);
+                        ledgers.put(ledgerId, newInfo);
+                        store.asyncUpdateLedgerIds(name, 
getManagedLedgerInfo(), ledgersStat,
+                                new MetaStoreCallback<Void>() {
+                                    @Override
+                                    public void operationComplete(Void result, 
Stat stat) {
+                                        ledgersStat = stat;
+                                        unlockingPromise.complete(null);
+                                    }
+
+                                    @Override
+                                    public void 
operationFailed(MetaStoreException e) {
+                                        
unlockingPromise.completeExceptionally(e);
+                                    }
+                                });
+                    } catch (ManagedLedgerException mle) {
+                        unlockingPromise.completeExceptionally(mle);
+                    }
                 }
             }
-
         }
-        return promise;
     }
 
     private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long 
ledgerId, UUID uuid) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to