merlimat commented on code in PR #22842:
URL: https://github.com/apache/pulsar/pull/22842#discussion_r1626740560
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -378,63 +378,77 @@ public void asyncOpen(final String name, final
ManagedLedgerConfig config, final
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new
CompletableFuture<>();
- BookKeeper bk = bookkeeperFactory.get(
- new
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-
config.getBookKeeperEnsemblePlacementPolicyProperties()));
- final ManagedLedgerImpl newledger = config.getShadowSource() ==
null
- ? new ManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name, mlOwnershipChecker)
- : new ShadowManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name,
- mlOwnershipChecker);
- PendingInitializeManagedLedger pendingLedger = new
PendingInitializeManagedLedger(newledger);
- pendingInitializeLedgers.put(name, pendingLedger);
- newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
- @Override
- public void initializeComplete() {
- log.info("[{}] Successfully initialize managed ledger",
name);
- pendingInitializeLedgers.remove(name, pendingLedger);
- future.complete(newledger);
-
- // May need to update the cursor position
- newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
- // May need to trigger offloading
- if (config.isTriggerOffloadOnTopicLoad()) {
-
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
- }
+ CompletableFuture<BookKeeper> bkFuture =
createBookKeeperClient(config);
+ bkFuture.handle((bk, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ return null;
}
+ final ManagedLedgerImpl newledger = config.getShadowSource()
== null
+ ? new ManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name, mlOwnershipChecker)
+ : new ShadowManagedLedgerImpl(this, bk, store, config,
scheduledExecutor, name,
+ mlOwnershipChecker);
+ PendingInitializeManagedLedger pendingLedger = new
PendingInitializeManagedLedger(newledger);
+ pendingInitializeLedgers.put(name, pendingLedger);
+ newledger.initialize(new
ManagedLedgerInitializeLedgerCallback() {
+ @Override
+ public void initializeComplete() {
+ log.info("[{}] Successfully initialize managed
ledger", name);
+ pendingInitializeLedgers.remove(name, pendingLedger);
+ future.complete(newledger);
- @Override
- public void initializeFailed(ManagedLedgerException e) {
- if (config.isCreateIfMissing()) {
- log.error("[{}] Failed to initialize managed ledger:
{}", name, e.getMessage());
+ // May need to update the cursor position
+
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // May need to trigger offloading
+ if (config.isTriggerOffloadOnTopicLoad()) {
+
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+ }
}
- // Clean the map if initialization fails
- ledgers.remove(name, future);
+ @Override
+ public void initializeFailed(ManagedLedgerException e) {
+ if (config.isCreateIfMissing()) {
+ log.error("[{}] Failed to initialize managed
ledger: {}", name, e.getMessage());
+ }
- if (pendingInitializeLedgers.remove(name, pendingLedger)) {
- pendingLedger.ledger.asyncClose(new CloseCallback() {
- @Override
- public void closeComplete(Object ctx) {
- // no-op
- }
+ // Clean the map if initialization fails
+ ledgers.remove(name, future);
- @Override
- public void closeFailed(ManagedLedgerException
exception, Object ctx) {
- log.warn("[{}] Failed to a pending
initialization managed ledger", name, exception);
- }
- }, null);
- }
+ if (pendingInitializeLedgers.remove(name,
pendingLedger)) {
+ pendingLedger.ledger.asyncClose(new
CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ // no-op
+ }
- future.completeExceptionally(e);
- }
- }, null);
+ @Override
+ public void closeFailed(ManagedLedgerException
exception, Object ctx) {
+ log.warn("[{}] Failed to a pending
initialization managed ledger", name, exception);
+ }
+ }, null);
+ }
+ future.completeExceptionally(e);
+ }
+ }, null);
+ return null;
+ });
return future;
}).thenAccept(ml -> callback.openLedgerComplete(ml,
ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException)
exception.getCause(), ctx);
return null;
});
}
+ private CompletableFuture<BookKeeper>
createBookKeeperClient(ManagedLedgerConfig config) {
+ CompletableFuture<BookKeeper> future = new CompletableFuture<>();
+ scheduledExecutor.execute(() -> {
Review Comment:
I think the culprit of the issue is in the
`BookkeeperFactoryForCustomEnsemblePlacementPolicy` interface, because it
effectively masks a potentially blocking operation behind a method that does
not look blocking, and doesn't even declare an exception.
I think a better solution here would to make the interface
`BookkeeperFactoryForCustomEnsemblePlacementPolicy` to return a
CompletableFuture of a BK client instead of using an executor here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]