BewareMyPower commented on code in PR #23311:
URL: https://github.com/apache/pulsar/pull/23311#discussion_r1762672146
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java:
##########
@@ -102,369 +89,28 @@ private long getNumberOfEntries(Range<Position> range,
}
}
- public PersistentOfflineTopicStats
getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
+ public PersistentOfflineTopicStats
getEstimatedUnloadedTopicBacklog(ManagedLedgerFactory factory,
String managedLedgerName) throws Exception {
return estimateUnloadedTopicBacklog(factory,
TopicName.get("persistent://" + managedLedgerName));
}
- public PersistentOfflineTopicStats
estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
- TopicName topicName) throws Exception {
+ public PersistentOfflineTopicStats
estimateUnloadedTopicBacklog(ManagedLedgerFactory factory,
+ TopicName
topicName) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
- long numberOfEntries = 0;
- long totalSize = 0;
- final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgers = new ConcurrentSkipListMap<>();
final PersistentOfflineTopicStats offlineTopicStats = new
PersistentOfflineTopicStats(managedLedgerName,
brokerName);
-
- // calculate total managed ledger size and number of entries without
loading the topic
- readLedgerMeta(factory, topicName, ledgers);
- for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values())
{
- numberOfEntries += ls.getEntries();
- totalSize += ls.getSize();
- if (accurate) {
- offlineTopicStats.addLedgerDetails(ls.getEntries(),
ls.getTimestamp(), ls.getSize(), ls.getLedgerId());
- }
- }
- offlineTopicStats.totalMessages = numberOfEntries;
- offlineTopicStats.storageSize = totalSize;
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of entries - {} and size - {}",
managedLedgerName, numberOfEntries, totalSize);
- }
-
- // calculate per cursor message backlog
- calculateCursorBacklogs(factory, topicName, ledgers,
offlineTopicStats);
- offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());
-
- return offlineTopicStats;
- }
-
- private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final
TopicName topicName,
- final NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
- String managedLedgerName = topicName.getPersistenceNamingEncoding();
- MetaStore store = factory.getMetaStore();
-
- final CountDownLatch mlMetaCounter = new CountDownLatch(1);
-
- store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing
*/,
- new
MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
- @Override
- public void
operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
- for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls :
mlInfo.getLedgerInfoList()) {
- ledgers.put(ls.getLedgerId(), ls);
- }
-
- // find no of entries in last ledger
- if (!ledgers.isEmpty()) {
- final long id = ledgers.lastKey();
- AsyncCallback.OpenCallback opencb = (rc, lh, ctx1)
-> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Opened ledger {}: {}",
managedLedgerName, id,
- BKException.getMessage(rc));
- }
- if (rc == BKException.Code.OK) {
- MLDataFormats.ManagedLedgerInfo.LedgerInfo
info =
-
MLDataFormats.ManagedLedgerInfo.LedgerInfo
-
.newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1)
-
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
- ledgers.put(id, info);
- mlMetaCounter.countDown();
- } else if
(Errors.isNoSuchLedgerExistsException(rc)) {
- log.warn("[{}] Ledger not found: {}",
managedLedgerName, ledgers.lastKey());
- ledgers.remove(ledgers.lastKey());
- mlMetaCounter.countDown();
- } else {
- log.error("[{}] Failed to open ledger {}:
{}", managedLedgerName, id,
- BKException.getMessage(rc));
- mlMetaCounter.countDown();
- }
- };
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Opening ledger {}",
managedLedgerName, id);
- }
-
- factory.getBookKeeper()
- .thenAccept(bk -> {
- bk.asyncOpenLedgerNoRecovery(id,
digestType, password, opencb, null);
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to open ledger
{}: {}", managedLedgerName, id, ex);
- opencb.openComplete(-1, null, null);
- mlMetaCounter.countDown();
- return null;
- });
- } else {
- log.warn("[{}] Ledger list empty",
managedLedgerName);
- mlMetaCounter.countDown();
- }
- }
-
- @Override
- public void
operationFailed(ManagedLedgerException.MetaStoreException e) {
- log.warn("[{}] Unable to obtain managed ledger
metadata - {}", managedLedgerName, e);
- mlMetaCounter.countDown();
- }
- });
-
- if (accurate) {
- // block until however long it takes for operation to complete
- mlMetaCounter.await();
+ if (factory instanceof ManagedLedgerFactoryImpl) {
+ List<Object> ctx = new ArrayList<>();
+ ctx.add(digestType);
+ ctx.add(password);
+ factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName,
accurate, ctx);
} else {
- mlMetaCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
+ Object ctx = null;
+ factory.estimateUnloadedTopicBacklog(offlineTopicStats, topicName,
accurate, ctx);
Review Comment:
Since this PR's motivation is mainly to decouple the ML interface from the
Impls, this comment can be improved in a future PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]