gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r669244681



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = 
managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = 
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += 
managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - 
sizeBeforePosLedger);
+                    } else {
+                        
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned 
topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned 
topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = 
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = 
jsonMapper().readTree(internalInfo).get("partitions");
+                    for (Iterator<Map.Entry<String, JsonNode>> it = 
partitionedInternalInfos.fields(); it.hasNext(); ) {
+                        String partitionedInternalInfo = 
it.next().getValue().toString();
+                        ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+                        ledgerExists = ledgerExists || 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                    }
+                }
+            }
+            return ledgerExists;
+        }  catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), 
topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean 
checkLedgerExists) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get backlog size for {}", 
clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        if(checkLedgerExists) {
+            boolean ledgerExists = isLedgerExists(messageId);
+            if (!ledgerExists) {
+                log.warn("[{}] ledger {} not exists on topic {}", 
messageId.getLedgerId(), clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
String.format("ledger %s not exists", messageId.getLedgerId())));
+                return;
+            }
+        }
+
+        if (topicName.isPartitioned()) {

Review comment:
       maybe the method name is confused, what mean here is for single partition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to