merlimat commented on code in PR #22842:
URL: https://github.com/apache/pulsar/pull/22842#discussion_r1626740560


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -378,63 +378,77 @@ public void asyncOpen(final String name, final 
ManagedLedgerConfig config, final
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new 
CompletableFuture<>();
-            BookKeeper bk = bookkeeperFactory.get(
-                    new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                            
config.getBookKeeperEnsemblePlacementPolicyProperties()));
-            final ManagedLedgerImpl newledger = config.getShadowSource() == 
null
-                    ? new ManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker)
-                    : new ShadowManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name,
-                    mlOwnershipChecker);
-            PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
-            pendingInitializeLedgers.put(name, pendingLedger);
-            newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
-                @Override
-                public void initializeComplete() {
-                    log.info("[{}] Successfully initialize managed ledger", 
name);
-                    pendingInitializeLedgers.remove(name, pendingLedger);
-                    future.complete(newledger);
-
-                    // May need to update the cursor position
-                    newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
-                    // May need to trigger offloading
-                    if (config.isTriggerOffloadOnTopicLoad()) {
-                        
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
-                    }
+            CompletableFuture<BookKeeper> bkFuture = 
createBookKeeperClient(config);
+            bkFuture.handle((bk, ex) -> {
+                if (ex != null) {
+                    future.completeExceptionally(ex);
+                    return null;
                 }
+                final ManagedLedgerImpl newledger = config.getShadowSource() 
== null
+                        ? new ManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker)
+                        : new ShadowManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name,
+                        mlOwnershipChecker);
+                PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
+                pendingInitializeLedgers.put(name, pendingLedger);
+                newledger.initialize(new 
ManagedLedgerInitializeLedgerCallback() {
+                    @Override
+                    public void initializeComplete() {
+                        log.info("[{}] Successfully initialize managed 
ledger", name);
+                        pendingInitializeLedgers.remove(name, pendingLedger);
+                        future.complete(newledger);
 
-                @Override
-                public void initializeFailed(ManagedLedgerException e) {
-                    if (config.isCreateIfMissing()) {
-                        log.error("[{}] Failed to initialize managed ledger: 
{}", name, e.getMessage());
+                        // May need to update the cursor position
+                        
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                        // May need to trigger offloading
+                        if (config.isTriggerOffloadOnTopicLoad()) {
+                            
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+                        }
                     }
 
-                    // Clean the map if initialization fails
-                    ledgers.remove(name, future);
+                    @Override
+                    public void initializeFailed(ManagedLedgerException e) {
+                        if (config.isCreateIfMissing()) {
+                            log.error("[{}] Failed to initialize managed 
ledger: {}", name, e.getMessage());
+                        }
 
-                    if (pendingInitializeLedgers.remove(name, pendingLedger)) {
-                        pendingLedger.ledger.asyncClose(new CloseCallback() {
-                            @Override
-                            public void closeComplete(Object ctx) {
-                                // no-op
-                            }
+                        // Clean the map if initialization fails
+                        ledgers.remove(name, future);
 
-                            @Override
-                            public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
-                                log.warn("[{}] Failed to a pending 
initialization managed ledger", name, exception);
-                            }
-                        }, null);
-                    }
+                        if (pendingInitializeLedgers.remove(name, 
pendingLedger)) {
+                            pendingLedger.ledger.asyncClose(new 
CloseCallback() {
+                                @Override
+                                public void closeComplete(Object ctx) {
+                                    // no-op
+                                }
 
-                    future.completeExceptionally(e);
-                }
-            }, null);
+                                @Override
+                                public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
+                                    log.warn("[{}] Failed to a pending 
initialization managed ledger", name, exception);
+                                }
+                            }, null);
+                        }
+                        future.completeExceptionally(e);
+                    }
+                }, null);
+                return null;
+            });
             return future;
         }).thenAccept(ml -> callback.openLedgerComplete(ml, 
ctx)).exceptionally(exception -> {
             callback.openLedgerFailed((ManagedLedgerException) 
exception.getCause(), ctx);
             return null;
         });
     }
 
+    private CompletableFuture<BookKeeper> 
createBookKeeperClient(ManagedLedgerConfig config) {
+        CompletableFuture<BookKeeper> future = new CompletableFuture<>();
+        scheduledExecutor.execute(() -> {

Review Comment:
   I think the culprit of the issue is in the 
`BookkeeperFactoryForCustomEnsemblePlacementPolicy` interface, because it 
effectively masks a potentially blocking operation behind a method that does 
not look blocking, and doesn't even declare an exception.
   
   I think a better solution here would to make the interface 
`BookkeeperFactoryForCustomEnsemblePlacementPolicy` to return a 
CompletableFuture of a BK client instead of using an executor here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to