BewareMyPower commented on code in PR #23311:
URL: https://github.com/apache/pulsar/pull/23311#discussion_r1762519616


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionBound.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+public enum PositionBound {
+    // define boundaries for position based seeks and searches
+        startIncluded, startExcluded

Review Comment:
   ```suggestion
       startIncluded, startExcluded
   ```
   
   Adjust indent



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java:
##########
@@ -102,369 +89,28 @@ private long getNumberOfEntries(Range<Position> range,
         }
     }
 
-    public PersistentOfflineTopicStats 
getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
+    public PersistentOfflineTopicStats 
getEstimatedUnloadedTopicBacklog(ManagedLedgerFactory factory,
             String managedLedgerName) throws Exception {
         return estimateUnloadedTopicBacklog(factory, 
TopicName.get("persistent://" + managedLedgerName));
     }
 
-    public PersistentOfflineTopicStats 
estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
-            TopicName topicName) throws Exception {
+    public PersistentOfflineTopicStats 
estimateUnloadedTopicBacklog(ManagedLedgerFactory factory,
+                                                                    TopicName 
topicName) throws Exception {
         String managedLedgerName = topicName.getPersistenceNamingEncoding();
-        long numberOfEntries = 0;
-        long totalSize = 0;
-        final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgers = new ConcurrentSkipListMap<>();
         final PersistentOfflineTopicStats offlineTopicStats = new 
PersistentOfflineTopicStats(managedLedgerName,
                 brokerName);
-
-        // calculate total managed ledger size and number of entries without 
loading the topic
-        readLedgerMeta(factory, topicName, ledgers);
-        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) 
{
-            numberOfEntries += ls.getEntries();
-            totalSize += ls.getSize();
-            if (accurate) {
-                offlineTopicStats.addLedgerDetails(ls.getEntries(), 
ls.getTimestamp(), ls.getSize(), ls.getLedgerId());
-            }
-        }
-        offlineTopicStats.totalMessages = numberOfEntries;
-        offlineTopicStats.storageSize = totalSize;
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Total number of entries - {} and size - {}", 
managedLedgerName, numberOfEntries, totalSize);
-        }
-
-        // calculate per cursor message backlog
-        calculateCursorBacklogs(factory, topicName, ledgers, 
offlineTopicStats);
-        offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());
-
-        return offlineTopicStats;
-    }
-
-    private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final 
TopicName topicName,
-            final NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
-        String managedLedgerName = topicName.getPersistenceNamingEncoding();
-        MetaStore store = factory.getMetaStore();
-
-        final CountDownLatch mlMetaCounter = new CountDownLatch(1);
-
-        store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing 
*/,
-                new 
MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
-                    @Override
-                    public void 
operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
-                        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : 
mlInfo.getLedgerInfoList()) {
-                            ledgers.put(ls.getLedgerId(), ls);
-                        }
-
-                        // find no of entries in last ledger
-                        if (!ledgers.isEmpty()) {
-                            final long id = ledgers.lastKey();
-                            AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) 
-> {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Opened ledger {}: {}", 
managedLedgerName, id,
-                                            BKException.getMessage(rc));
-                                }
-                                if (rc == BKException.Code.OK) {
-                                    MLDataFormats.ManagedLedgerInfo.LedgerInfo 
info =
-                                        
MLDataFormats.ManagedLedgerInfo.LedgerInfo
-                                            
.newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1)
-                                            
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
-                                    ledgers.put(id, info);
-                                    mlMetaCounter.countDown();
-                                } else if 
(Errors.isNoSuchLedgerExistsException(rc)) {
-                                    log.warn("[{}] Ledger not found: {}", 
managedLedgerName, ledgers.lastKey());
-                                    ledgers.remove(ledgers.lastKey());
-                                    mlMetaCounter.countDown();
-                                } else {
-                                    log.error("[{}] Failed to open ledger {}: 
{}", managedLedgerName, id,
-                                            BKException.getMessage(rc));
-                                    mlMetaCounter.countDown();
-                                }
-                            };
-
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] Opening ledger {}", 
managedLedgerName, id);
-                            }
-
-                            factory.getBookKeeper()
-                                    .thenAccept(bk -> {
-                                        bk.asyncOpenLedgerNoRecovery(id, 
digestType, password, opencb, null);
-                                    }).exceptionally(ex -> {
-                                        log.warn("[{}] Failed to open ledger 
{}: {}", managedLedgerName, id, ex);
-                                        opencb.openComplete(-1, null, null);
-                                        mlMetaCounter.countDown();
-                                        return null;
-                                    });
-                        } else {
-                            log.warn("[{}] Ledger list empty", 
managedLedgerName);
-                            mlMetaCounter.countDown();
-                        }
-                    }
-
-                    @Override
-                    public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
-                        log.warn("[{}] Unable to obtain managed ledger 
metadata - {}", managedLedgerName, e);
-                        mlMetaCounter.countDown();
-                    }
-                });
-
-        if (accurate) {
-            // block until however long it takes for operation to complete
-            mlMetaCounter.await();
+        if (factory instanceof ManagedLedgerFactoryImpl) {
+            List<Object> ctx = new ArrayList<>();
+            ctx.add(digestType);
+            ctx.add(password);
+            factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName, 
accurate, ctx);
         } else {
-            mlMetaCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
+            Object ctx = null;
+            factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName, 
accurate, ctx);

Review Comment:
   This `estimateUnloadedTopicBacklog` API is not intuitive.
   1. Why it needs an `accurate` parameter? (actually it's always false) 
   2. If the factory is not a `ManagedLedgerFactoryImpl`, the `ctx` field is 
always null.
   
   <img width="1051" alt="image" 
src="https://github.com/user-attachments/assets/1e17b59c-e54e-4b84-88db-391492cdcb8a";>
   
   It's not easy for the author of a 3rd party ML to know how to implement this 
method correctly without enough API docs. The API design is bad. It should be 
something like:
   
   ```java
      PersistentOfflineTopicStats estimateUnloadedTopicBacklog(TopicName 
topicName);
   ```
   
   BTW, IMHO, this API should be moved to an internal method of 
`ManagedLedgerFactoryImpl`, but it might need another proposal to discuss about 
the API design.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -233,4 +235,14 @@ void asyncDelete(String name, 
CompletableFuture<ManagedLedgerConfig> mlConfigFut
      * @return properties of this managedLedger.
      */
     CompletableFuture<Map<String, String>> 
getManagedLedgerPropertiesAsync(String name);
+
+    Map<String, ManagedLedger> getManagedLedgers();
+
+    ManagedLedgerFactoryMXBean getCacheStats();
+
+
+    void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats 
offlineTopicStats, TopicName topicName,
+                                              boolean accurate, Object ctx) 
throws Exception;

Review Comment:
   ```suggestion
       void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats 
offlineTopicStats, TopicName topicName,
                                         boolean accurate, Object ctx) throws 
Exception;
   ```
   
   Adjust the indent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to