BewareMyPower commented on code in PR #23311:
URL: https://github.com/apache/pulsar/pull/23311#discussion_r1762672146


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java:
##########
@@ -102,369 +89,28 @@ private long getNumberOfEntries(Range<Position> range,
         }
     }
 
-    public PersistentOfflineTopicStats 
getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
+    public PersistentOfflineTopicStats 
getEstimatedUnloadedTopicBacklog(ManagedLedgerFactory factory,
             String managedLedgerName) throws Exception {
         return estimateUnloadedTopicBacklog(factory, 
TopicName.get("persistent://" + managedLedgerName));
     }
 
-    public PersistentOfflineTopicStats 
estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
-            TopicName topicName) throws Exception {
+    public PersistentOfflineTopicStats 
estimateUnloadedTopicBacklog(ManagedLedgerFactory factory,
+                                                                    TopicName 
topicName) throws Exception {
         String managedLedgerName = topicName.getPersistenceNamingEncoding();
-        long numberOfEntries = 0;
-        long totalSize = 0;
-        final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgers = new ConcurrentSkipListMap<>();
         final PersistentOfflineTopicStats offlineTopicStats = new 
PersistentOfflineTopicStats(managedLedgerName,
                 brokerName);
-
-        // calculate total managed ledger size and number of entries without 
loading the topic
-        readLedgerMeta(factory, topicName, ledgers);
-        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) 
{
-            numberOfEntries += ls.getEntries();
-            totalSize += ls.getSize();
-            if (accurate) {
-                offlineTopicStats.addLedgerDetails(ls.getEntries(), 
ls.getTimestamp(), ls.getSize(), ls.getLedgerId());
-            }
-        }
-        offlineTopicStats.totalMessages = numberOfEntries;
-        offlineTopicStats.storageSize = totalSize;
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Total number of entries - {} and size - {}", 
managedLedgerName, numberOfEntries, totalSize);
-        }
-
-        // calculate per cursor message backlog
-        calculateCursorBacklogs(factory, topicName, ledgers, 
offlineTopicStats);
-        offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());
-
-        return offlineTopicStats;
-    }
-
-    private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final 
TopicName topicName,
-            final NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
-        String managedLedgerName = topicName.getPersistenceNamingEncoding();
-        MetaStore store = factory.getMetaStore();
-
-        final CountDownLatch mlMetaCounter = new CountDownLatch(1);
-
-        store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing 
*/,
-                new 
MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
-                    @Override
-                    public void 
operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
-                        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : 
mlInfo.getLedgerInfoList()) {
-                            ledgers.put(ls.getLedgerId(), ls);
-                        }
-
-                        // find no of entries in last ledger
-                        if (!ledgers.isEmpty()) {
-                            final long id = ledgers.lastKey();
-                            AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) 
-> {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Opened ledger {}: {}", 
managedLedgerName, id,
-                                            BKException.getMessage(rc));
-                                }
-                                if (rc == BKException.Code.OK) {
-                                    MLDataFormats.ManagedLedgerInfo.LedgerInfo 
info =
-                                        
MLDataFormats.ManagedLedgerInfo.LedgerInfo
-                                            
.newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1)
-                                            
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
-                                    ledgers.put(id, info);
-                                    mlMetaCounter.countDown();
-                                } else if 
(Errors.isNoSuchLedgerExistsException(rc)) {
-                                    log.warn("[{}] Ledger not found: {}", 
managedLedgerName, ledgers.lastKey());
-                                    ledgers.remove(ledgers.lastKey());
-                                    mlMetaCounter.countDown();
-                                } else {
-                                    log.error("[{}] Failed to open ledger {}: 
{}", managedLedgerName, id,
-                                            BKException.getMessage(rc));
-                                    mlMetaCounter.countDown();
-                                }
-                            };
-
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] Opening ledger {}", 
managedLedgerName, id);
-                            }
-
-                            factory.getBookKeeper()
-                                    .thenAccept(bk -> {
-                                        bk.asyncOpenLedgerNoRecovery(id, 
digestType, password, opencb, null);
-                                    }).exceptionally(ex -> {
-                                        log.warn("[{}] Failed to open ledger 
{}: {}", managedLedgerName, id, ex);
-                                        opencb.openComplete(-1, null, null);
-                                        mlMetaCounter.countDown();
-                                        return null;
-                                    });
-                        } else {
-                            log.warn("[{}] Ledger list empty", 
managedLedgerName);
-                            mlMetaCounter.countDown();
-                        }
-                    }
-
-                    @Override
-                    public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
-                        log.warn("[{}] Unable to obtain managed ledger 
metadata - {}", managedLedgerName, e);
-                        mlMetaCounter.countDown();
-                    }
-                });
-
-        if (accurate) {
-            // block until however long it takes for operation to complete
-            mlMetaCounter.await();
+        if (factory instanceof ManagedLedgerFactoryImpl) {
+            List<Object> ctx = new ArrayList<>();
+            ctx.add(digestType);
+            ctx.add(password);
+            factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName, 
accurate, ctx);
         } else {
-            mlMetaCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
+            Object ctx = null;
+            factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName, 
accurate, ctx);

Review Comment:
   Since this PR's motivation is mainly to decouple the ML interface from the 
Impls, this comment can be improved in a future PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to