BewareMyPower commented on code in PR #23311: URL: https://github.com/apache/pulsar/pull/23311#discussion_r1762519616
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/PositionBound.java: ########## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +public enum PositionBound { + // define boundaries for position based seeks and searches + startIncluded, startExcluded Review Comment: ```suggestion startIncluded, startExcluded ``` Adjust indent ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java: ########## @@ -102,369 +89,28 @@ private long getNumberOfEntries(Range<Position> range, } } - public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory, + public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactory factory, String managedLedgerName) throws Exception { return estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName)); } - public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory, - TopicName topicName) throws Exception { + public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactory factory, + TopicName topicName) throws Exception { String managedLedgerName = topicName.getPersistenceNamingEncoding(); - long numberOfEntries = 0; - long totalSize = 0; - final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = new ConcurrentSkipListMap<>(); final PersistentOfflineTopicStats offlineTopicStats = new PersistentOfflineTopicStats(managedLedgerName, brokerName); - - // calculate total managed ledger size and number of entries without loading the topic - readLedgerMeta(factory, topicName, ledgers); - for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) { - numberOfEntries += ls.getEntries(); - totalSize += ls.getSize(); - if (accurate) { - offlineTopicStats.addLedgerDetails(ls.getEntries(), ls.getTimestamp(), ls.getSize(), ls.getLedgerId()); - } - } - offlineTopicStats.totalMessages = numberOfEntries; - offlineTopicStats.storageSize = totalSize; - if (log.isDebugEnabled()) { - log.debug("[{}] Total number of entries - {} and size - {}", managedLedgerName, numberOfEntries, totalSize); - } - - // calculate per cursor message backlog - calculateCursorBacklogs(factory, topicName, ledgers, offlineTopicStats); - offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis()); - - return offlineTopicStats; - } - - private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicName topicName, - final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception { - String managedLedgerName = topicName.getPersistenceNamingEncoding(); - MetaStore store = factory.getMetaStore(); - - final CountDownLatch mlMetaCounter = new CountDownLatch(1); - - store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */, - new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() { - @Override - public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { - for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) { - ledgers.put(ls.getLedgerId(), ls); - } - - // find no of entries in last ledger - if (!ledgers.isEmpty()) { - final long id = ledgers.lastKey(); - AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Opened ledger {}: {}", managedLedgerName, id, - BKException.getMessage(rc)); - } - if (rc == BKException.Code.OK) { - MLDataFormats.ManagedLedgerInfo.LedgerInfo info = - MLDataFormats.ManagedLedgerInfo.LedgerInfo - .newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1) - .setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build(); - ledgers.put(id, info); - mlMetaCounter.countDown(); - } else if (Errors.isNoSuchLedgerExistsException(rc)) { - log.warn("[{}] Ledger not found: {}", managedLedgerName, ledgers.lastKey()); - ledgers.remove(ledgers.lastKey()); - mlMetaCounter.countDown(); - } else { - log.error("[{}] Failed to open ledger {}: {}", managedLedgerName, id, - BKException.getMessage(rc)); - mlMetaCounter.countDown(); - } - }; - - if (log.isDebugEnabled()) { - log.debug("[{}] Opening ledger {}", managedLedgerName, id); - } - - factory.getBookKeeper() - .thenAccept(bk -> { - bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null); - }).exceptionally(ex -> { - log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, ex); - opencb.openComplete(-1, null, null); - mlMetaCounter.countDown(); - return null; - }); - } else { - log.warn("[{}] Ledger list empty", managedLedgerName); - mlMetaCounter.countDown(); - } - } - - @Override - public void operationFailed(ManagedLedgerException.MetaStoreException e) { - log.warn("[{}] Unable to obtain managed ledger metadata - {}", managedLedgerName, e); - mlMetaCounter.countDown(); - } - }); - - if (accurate) { - // block until however long it takes for operation to complete - mlMetaCounter.await(); + if (factory instanceof ManagedLedgerFactoryImpl) { + List<Object> ctx = new ArrayList<>(); + ctx.add(digestType); + ctx.add(password); + factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName, accurate, ctx); } else { - mlMetaCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); - + Object ctx = null; + factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName, accurate, ctx); Review Comment: This `estimateUnloadedTopicBacklog` API is not intuitive. 1. Why it needs an `accurate` parameter? (actually it's always false) 2. If the factory is not a `ManagedLedgerFactoryImpl`, the `ctx` field is always null. <img width="1051" alt="image" src="https://github.com/user-attachments/assets/1e17b59c-e54e-4b84-88db-391492cdcb8a"> It's not easy for the author of a 3rd party ML to know how to implement this method correctly without enough API docs. The API design is bad. It should be something like: ```java PersistentOfflineTopicStats estimateUnloadedTopicBacklog(TopicName topicName); ``` BTW, IMHO, this API should be moved to an internal method of `ManagedLedgerFactoryImpl`, but it might need another proposal to discuss about the API design. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java: ########## @@ -233,4 +235,14 @@ void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFut * @return properties of this managedLedger. */ CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name); + + Map<String, ManagedLedger> getManagedLedgers(); + + ManagedLedgerFactoryMXBean getCacheStats(); + + + void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats offlineTopicStats, TopicName topicName, + boolean accurate, Object ctx) throws Exception; Review Comment: ```suggestion void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats offlineTopicStats, TopicName topicName, boolean accurate, Object ctx) throws Exception; ``` Adjust the indent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
