sijie closed pull request #1961: Remove potential deadlock in offloading URL: https://github.com/apache/incubator-pulsar/pull/1961
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 68bc801a53..65276b3f2d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2053,11 +2053,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn } Optional<Throwable> errorToReport = firstError; synchronized (ManagedLedgerImpl.this) { - ledgersListMutex.lock(); + // if the ledger doesn't exist anymore, ignore the error if (ledgers.containsKey(ledgerId)) { errorToReport = Optional.of(firstError.orElse(exception)); } - ledgersListMutex.unlock(); } offloadLoop(promise, ledgersToOffload, @@ -2086,43 +2085,60 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn } private CompletableFuture<Void> transformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation) { - CompletableFuture<Void> promise = new CompletableFuture<>(); - synchronized (this) { - ledgersListMutex.lock(); + CompletableFuture<Void> promise = new CompletableFuture<Void>(); - promise.whenComplete((ignore, exception) -> { - ledgersListMutex.unlock(); - }); + tryTransformLedgerInfo(ledgerId, transformation, promise); - LedgerInfo oldInfo = ledgers.get(ledgerId); - if (oldInfo == null) { - promise.completeExceptionally( - new OffloadConflict( - "Ledger " + ledgerId + " no longer exists in ManagedLedger, likely trimmed")); - } else { - try { - LedgerInfo newInfo = transformation.transform(oldInfo); - ledgers.put(ledgerId, newInfo); - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, - new MetaStoreCallback<Void>() { - @Override - public void operationComplete(Void result, Stat stat) { - ledgersStat = stat; - promise.complete(null); - } + return promise; + } - @Override - public void operationFailed(MetaStoreException e) { - promise.completeExceptionally(e); - } - }); - } catch (ManagedLedgerException mle) { - promise.completeExceptionally(mle); + private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation, + CompletableFuture<Void> finalPromise) { + synchronized (this) { + if (!ledgersListMutex.tryLock()) { + // retry in 100 milliseconds + scheduledExecutor.schedule(safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, + finalPromise)), + 100, TimeUnit.MILLISECONDS); + } else { // lock acquired + CompletableFuture<Void> unlockingPromise = new CompletableFuture<>(); + unlockingPromise.whenComplete((res, ex) -> { + ledgersListMutex.unlock(); + if (ex != null) { + finalPromise.completeExceptionally(ex); + } else { + finalPromise.complete(res); + } + }); + + LedgerInfo oldInfo = ledgers.get(ledgerId); + if (oldInfo == null) { + unlockingPromise.completeExceptionally( + new OffloadConflict( + "Ledger " + ledgerId + " no longer exists in ManagedLedger, likely trimmed")); + } else { + try { + LedgerInfo newInfo = transformation.transform(oldInfo); + ledgers.put(ledgerId, newInfo); + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, + new MetaStoreCallback<Void>() { + @Override + public void operationComplete(Void result, Stat stat) { + ledgersStat = stat; + unlockingPromise.complete(null); + } + + @Override + public void operationFailed(MetaStoreException e) { + unlockingPromise.completeExceptionally(e); + } + }); + } catch (ManagedLedgerException mle) { + unlockingPromise.completeExceptionally(mle); + } } } - } - return promise; } private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services