MarvinCai commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r668760864



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1,35 +1,20 @@
-/**

Review comment:
       we need to keep the Apache license header

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1229,7 +1214,7 @@ public void getInfoFailed(ManagedLedgerException 
exception, Object ctx) {
     }
 
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, 
boolean authoritative,
-            boolean perPartition, boolean getPreciseBacklog, boolean 
subscriptionBacklogSize) {
+                                               boolean perPartition, boolean 
getPreciseBacklog, boolean subscriptionBacklogSize) {

Review comment:
       please remove unnecessary whitespace changes.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = 
managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = 
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {

Review comment:
       can simply return 0 in this case.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = 
managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = 
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += 
managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - 
sizeBeforePosLedger);
+                    } else {
+                        
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned 
topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned 
topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = 
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = 
jsonMapper().readTree(internalInfo).get("partitions");

Review comment:
       seems we're processing partitioned topic here, should be` 
!topicName.isPartitioned()` in if clause?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            synchronized (this) {

Review comment:
       what we process here is just ledger info in mledger, synchronizing a 
PersistentTopicsBase instance won't work in this case. also what data this 
synchronized block is trying to guard?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = 
managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = 
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += 
managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - 
sizeBeforePosLedger);
+                    } else {
+                        
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned 
topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned 
topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = 
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = 
jsonMapper().readTree(internalInfo).get("partitions");
+                    for (Iterator<Map.Entry<String, JsonNode>> it = 
partitionedInternalInfos.fields(); it.hasNext(); ) {
+                        String partitionedInternalInfo = 
it.next().getValue().toString();
+                        ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+                        ledgerExists = ledgerExists || 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                    }
+                }
+            }
+            return ledgerExists;
+        }  catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), 
topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean 
checkLedgerExists) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get backlog size for {}", 
clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        if(checkLedgerExists) {
+            boolean ledgerExists = isLedgerExists(messageId);
+            if (!ledgerExists) {
+                log.warn("[{}] ledger {} not exists on topic {}", 
messageId.getLedgerId(), clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
String.format("ledger %s not exists", messageId.getLedgerId())));
+                return;
+            }
+        }
+
+        if (topicName.isPartitioned()) {
+            internalGetBacklogSizeForNonPartitionedTopic(asyncResponse, 
messageId, authoritative);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Long>> futures = 
Lists.newArrayList();
+                    List<Long> backlogSizeAccrossPartitions = new 
ArrayList<>();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                    
.getBacklogSizeByMessageIdAsync(topicNamePartition.toString(), messageId, 
false).whenComplete((backlogSize, throwable) -> {

Review comment:
       shouldn't hardcode false for `checkLedgerExists`

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = 
managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = 
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += 
managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - 
sizeBeforePosLedger);
+                    } else {
+                        
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned 
topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned 
topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = 
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = 
jsonMapper().readTree(internalInfo).get("partitions");
+                    for (Iterator<Map.Entry<String, JsonNode>> it = 
partitionedInternalInfos.fields(); it.hasNext(); ) {
+                        String partitionedInternalInfo = 
it.next().getValue().toString();
+                        ManagedLedgerInfo managedLedgerInfo = 
jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+                        ledgerExists = ledgerExists || 
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> 
messageId.getLedgerId() == ledgerInfo.ledgerId);
+                    }
+                }
+            }
+            return ledgerExists;
+        }  catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), 
topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalGetBacklogSizeByMessageId(AsyncResponse 
asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean 
checkLedgerExists) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get backlog size for {}", 
clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        if(checkLedgerExists) {
+            boolean ledgerExists = isLedgerExists(messageId);
+            if (!ledgerExists) {
+                log.warn("[{}] ledger {} not exists on topic {}", 
messageId.getLedgerId(), clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
String.format("ledger %s not exists", messageId.getLedgerId())));
+                return;
+            }
+        }
+
+        if (topicName.isPartitioned()) {

Review comment:
       same here, why invoking method for non-partitioned topic when 
`topicName.isPartitioned() `is `true`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to