gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r669241047
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return quotaMap;
}
+ private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
+
+ try {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ synchronized (this) {
+ if(messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+ NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers =
managedLedger.getLedgersInfo();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo =
ledgers.get(pos.getLedgerId());
+ if (ledgerInfo == null) {
+ long sizeBeforePosLedger =
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+ long currentLedgerId = ledgers.lastKey();
+ if (currentLedgerId < pos.getLedgerId()) {
+ sizeBeforePosLedger +=
managedLedger.getCurrentLedgerSize();
+ }
+ asyncResponse.resume(managedLedger.getTotalSize() -
sizeBeforePosLedger);
+ } else {
+
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+ }
+ }
+
+ }
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to get backlog size for non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get backlog size for non-partitioned
topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ private boolean isLedgerExists(MessageIdImpl messageId){
+ try {
+ String internalInfo =
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+ boolean ledgerExists = false;
+ if(messageId.getLedgerId() == -1) {
+ ledgerExists = true;
+ } else {
+ if (topicName.isPartitioned()) {
+ ManagedLedgerInfo managedLedgerInfo =
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+ ledgerExists =
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo ->
messageId.getLedgerId() == ledgerInfo.ledgerId);
+ } else {
+ JsonNode partitionedInternalInfos =
jsonMapper().readTree(internalInfo).get("partitions");
Review comment:
No, ```topicName.isPartitioned()``` means the topic is with partition,
like 'topicname-partition-0'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]