MarvinCai commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r668760864
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1,35 +1,20 @@
-/**
Review comment:
we need to keep the Apache license header
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1229,7 +1214,7 @@ public void getInfoFailed(ManagedLedgerException
exception, Object ctx) {
}
protected void internalGetPartitionedStats(AsyncResponse asyncResponse,
boolean authoritative,
- boolean perPartition, boolean getPreciseBacklog, boolean
subscriptionBacklogSize) {
+ boolean perPartition, boolean
getPreciseBacklog, boolean subscriptionBacklogSize) {
Review comment:
please remove unnecessary whitespace changes.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return quotaMap;
}
+ private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
+
+ try {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ synchronized (this) {
+ if(messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+ NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers =
managedLedger.getLedgersInfo();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo =
ledgers.get(pos.getLedgerId());
+ if (ledgerInfo == null) {
+ long sizeBeforePosLedger =
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+ long currentLedgerId = ledgers.lastKey();
+ if (currentLedgerId < pos.getLedgerId()) {
Review comment:
can simply return 0 in this case.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return quotaMap;
}
+ private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
+
+ try {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ synchronized (this) {
+ if(messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+ NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers =
managedLedger.getLedgersInfo();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo =
ledgers.get(pos.getLedgerId());
+ if (ledgerInfo == null) {
+ long sizeBeforePosLedger =
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+ long currentLedgerId = ledgers.lastKey();
+ if (currentLedgerId < pos.getLedgerId()) {
+ sizeBeforePosLedger +=
managedLedger.getCurrentLedgerSize();
+ }
+ asyncResponse.resume(managedLedger.getTotalSize() -
sizeBeforePosLedger);
+ } else {
+
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+ }
+ }
+
+ }
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to get backlog size for non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get backlog size for non-partitioned
topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ private boolean isLedgerExists(MessageIdImpl messageId){
+ try {
+ String internalInfo =
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+ boolean ledgerExists = false;
+ if(messageId.getLedgerId() == -1) {
+ ledgerExists = true;
+ } else {
+ if (topicName.isPartitioned()) {
+ ManagedLedgerInfo managedLedgerInfo =
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+ ledgerExists =
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo ->
messageId.getLedgerId() == ledgerInfo.ledgerId);
+ } else {
+ JsonNode partitionedInternalInfos =
jsonMapper().readTree(internalInfo).get("partitions");
Review comment:
seems we're processing partitioned topic here, should be`
!topicName.isPartitioned()` in if clause?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return quotaMap;
}
+ private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
+
+ try {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ synchronized (this) {
Review comment:
what we process here is just ledger info in mledger, synchronizing a
PersistentTopicsBase instance won't work in this case. also what data this
synchronized block is trying to guard?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return quotaMap;
}
+ private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
+
+ try {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ synchronized (this) {
+ if(messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+ NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers =
managedLedger.getLedgersInfo();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo =
ledgers.get(pos.getLedgerId());
+ if (ledgerInfo == null) {
+ long sizeBeforePosLedger =
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+ long currentLedgerId = ledgers.lastKey();
+ if (currentLedgerId < pos.getLedgerId()) {
+ sizeBeforePosLedger +=
managedLedger.getCurrentLedgerSize();
+ }
+ asyncResponse.resume(managedLedger.getTotalSize() -
sizeBeforePosLedger);
+ } else {
+
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+ }
+ }
+
+ }
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to get backlog size for non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get backlog size for non-partitioned
topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ private boolean isLedgerExists(MessageIdImpl messageId){
+ try {
+ String internalInfo =
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+ boolean ledgerExists = false;
+ if(messageId.getLedgerId() == -1) {
+ ledgerExists = true;
+ } else {
+ if (topicName.isPartitioned()) {
+ ManagedLedgerInfo managedLedgerInfo =
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+ ledgerExists =
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo ->
messageId.getLedgerId() == ledgerInfo.ledgerId);
+ } else {
+ JsonNode partitionedInternalInfos =
jsonMapper().readTree(internalInfo).get("partitions");
+ for (Iterator<Map.Entry<String, JsonNode>> it =
partitionedInternalInfos.fields(); it.hasNext(); ) {
+ String partitionedInternalInfo =
it.next().getValue().toString();
+ ManagedLedgerInfo managedLedgerInfo =
jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+ ledgerExists = ledgerExists ||
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo ->
messageId.getLedgerId() == ledgerInfo.ledgerId);
+ }
+ }
+ }
+ return ledgerExists;
+ } catch (Exception e) {
+ log.error("[{}] Failed to get managed info for {}", clientAppId(),
topicName, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected void internalGetBacklogSizeByMessageId(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean
checkLedgerExists) {
+ if (topicName.isGlobal()) {
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get backlog size for {}",
clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+
+ if(checkLedgerExists) {
+ boolean ledgerExists = isLedgerExists(messageId);
+ if (!ledgerExists) {
+ log.warn("[{}] ledger {} not exists on topic {}",
messageId.getLedgerId(), clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
String.format("ledger %s not exists", messageId.getLedgerId())));
+ return;
+ }
+ }
+
+ if (topicName.isPartitioned()) {
+ internalGetBacklogSizeForNonPartitionedTopic(asyncResponse,
messageId, authoritative);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Long>> futures =
Lists.newArrayList();
+ List<Long> backlogSizeAccrossPartitions = new
ArrayList<>();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+ futures.add(pulsar().getAdminClient().topics()
+
.getBacklogSizeByMessageIdAsync(topicNamePartition.toString(), messageId,
false).whenComplete((backlogSize, throwable) -> {
Review comment:
shouldn't hardcode false for `checkLedgerExists`
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
return quotaMap;
}
+ private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+ // will redirect if the topic not owned by current broker
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ PositionImpl pos = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
+
+ try {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ synchronized (this) {
+ if(messageId.getLedgerId() == -1) {
+ asyncResponse.resume(managedLedger.getTotalSize());
+ } else {
+ NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers =
managedLedger.getLedgersInfo();
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo =
ledgers.get(pos.getLedgerId());
+ if (ledgerInfo == null) {
+ long sizeBeforePosLedger =
ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+
.mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+ long currentLedgerId = ledgers.lastKey();
+ if (currentLedgerId < pos.getLedgerId()) {
+ sizeBeforePosLedger +=
managedLedger.getCurrentLedgerSize();
+ }
+ asyncResponse.resume(managedLedger.getTotalSize() -
sizeBeforePosLedger);
+ } else {
+
asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+ }
+ }
+
+ }
+ } catch (WebApplicationException wae) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to get backlog size for non-partitioned
topic {}, redirecting to other brokers.",
+ clientAppId(), topicName, wae);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, wae);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get backlog size for non-partitioned
topic {}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ }
+ }
+
+ private boolean isLedgerExists(MessageIdImpl messageId){
+ try {
+ String internalInfo =
pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+ boolean ledgerExists = false;
+ if(messageId.getLedgerId() == -1) {
+ ledgerExists = true;
+ } else {
+ if (topicName.isPartitioned()) {
+ ManagedLedgerInfo managedLedgerInfo =
jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+ ledgerExists =
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo ->
messageId.getLedgerId() == ledgerInfo.ledgerId);
+ } else {
+ JsonNode partitionedInternalInfos =
jsonMapper().readTree(internalInfo).get("partitions");
+ for (Iterator<Map.Entry<String, JsonNode>> it =
partitionedInternalInfos.fields(); it.hasNext(); ) {
+ String partitionedInternalInfo =
it.next().getValue().toString();
+ ManagedLedgerInfo managedLedgerInfo =
jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+ ledgerExists = ledgerExists ||
managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo ->
messageId.getLedgerId() == ledgerInfo.ledgerId);
+ }
+ }
+ }
+ return ledgerExists;
+ } catch (Exception e) {
+ log.error("[{}] Failed to get managed info for {}", clientAppId(),
topicName, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected void internalGetBacklogSizeByMessageId(AsyncResponse
asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean
checkLedgerExists) {
+ if (topicName.isGlobal()) {
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to get backlog size for {}",
clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+
+ if(checkLedgerExists) {
+ boolean ledgerExists = isLedgerExists(messageId);
+ if (!ledgerExists) {
+ log.warn("[{}] ledger {} not exists on topic {}",
messageId.getLedgerId(), clientAppId(), topicName);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
String.format("ledger %s not exists", messageId.getLedgerId())));
+ return;
+ }
+ }
+
+ if (topicName.isPartitioned()) {
Review comment:
same here, why invoking method for non-partitioned topic when
`topicName.isPartitioned() `is `true`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]