This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e59cd05881b [fix] [broker] Remove blocking calls from
Subscription.getStats (#23088)
e59cd05881b is described below
commit e59cd05881bff11e4b127ed3496a02a0ce697fb7
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 29 18:47:33 2024 +0800
[fix] [broker] Remove blocking calls from Subscription.getStats (#23088)
---
.../service/persistent/PersistentSubscription.java | 51 +++++++++----
.../broker/service/persistent/PersistentTopic.java | 86 +++++++++++-----------
2 files changed, 80 insertions(+), 57 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 77aa5f82c39..2dd890cfd29 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -32,6 +32,8 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -1200,7 +1202,26 @@ public class PersistentSubscription extends
AbstractSubscription {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}
+ /**
+ * @deprecated please call {@link #getStatsAsync(GetStatsOptions)}.
+ */
+ @Deprecated
public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
+ // So far, there is no case hits this check.
+ if (getStatsOptions.isGetEarliestTimeInBacklog()) {
+ throw new IllegalArgumentException("Calling the sync method
subscription.getStats with"
+ + " getEarliestTimeInBacklog, it may encountered a
deadlock error.");
+ }
+ // The method "getStatsAsync" will be a sync method if the param
"isGetEarliestTimeInBacklog" is false.
+ try {
+ return getStatsAsync(getStatsOptions).get(5, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ // This error will never occur.
+ throw new RuntimeException(e);
+ }
+ }
+
+ public CompletableFuture<SubscriptionStatsImpl>
getStatsAsync(GetStatsOptions getStatsOptions) {
SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
@@ -1273,21 +1294,6 @@ public class PersistentSubscription extends
AbstractSubscription {
} else {
subStats.backlogSize = -1;
}
- if (getStatsOptions.isGetEarliestTimeInBacklog()) {
- if (subStats.msgBacklog > 0) {
- ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
- Position markDeletedPosition = cursor.getMarkDeletedPosition();
- long result = 0;
- try {
- result =
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
- } catch (InterruptedException | ExecutionException e) {
- result = -1;
- }
- subStats.earliestMsgPublishTimeInBacklog = result;
- } else {
- subStats.earliestMsgPublishTimeInBacklog = -1;
- }
- }
subStats.msgBacklogNoDelayed = subStats.msgBacklog -
subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
@@ -1329,7 +1335,20 @@ public class PersistentSubscription extends
AbstractSubscription {
subStats.nonContiguousDeletedMessagesRanges =
cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
- return subStats;
+ if (!getStatsOptions.isGetEarliestTimeInBacklog()) {
+ return CompletableFuture.completedFuture(subStats);
+ }
+ if (subStats.msgBacklog > 0) {
+ ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ Position markDeletedPosition = cursor.getMarkDeletedPosition();
+ return
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).thenApply(v
-> {
+ subStats.earliestMsgPublishTimeInBacklog = v;
+ return subStats;
+ });
+ } else {
+ subStats.earliestMsgPublishTimeInBacklog = -1;
+ return CompletableFuture.completedFuture(subStats);
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3587dab7755..42487d7239c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2584,7 +2584,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<? extends TopicStatsImpl>
asyncGetStats(GetStatsOptions getStatsOptions) {
- CompletableFuture<TopicStatsImpl> statsFuture = new
CompletableFuture<>();
TopicStatsImpl stats = new TopicStatsImpl();
ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats
= new ObjectObjectHashMap<>();
@@ -2617,32 +2616,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
- subscriptions.forEach((name, subscription) -> {
- SubscriptionStatsImpl subStats =
subscription.getStats(getStatsOptions);
-
- stats.msgRateOut += subStats.msgRateOut;
- stats.msgThroughputOut += subStats.msgThroughputOut;
- stats.bytesOutCounter += subStats.bytesOutCounter;
- stats.msgOutCounter += subStats.msgOutCounter;
- stats.subscriptions.put(name, subStats);
- stats.nonContiguousDeletedMessagesRanges +=
subStats.nonContiguousDeletedMessagesRanges;
- stats.nonContiguousDeletedMessagesRangesSerializedSize +=
- subStats.nonContiguousDeletedMessagesRangesSerializedSize;
- stats.delayedMessageIndexSizeInBytes +=
subStats.delayedMessageIndexSizeInBytes;
-
- subStats.bucketDelayedIndexStats.forEach((k, v) -> {
- TopicMetricBean topicMetricBean =
- stats.bucketDelayedIndexStats.computeIfAbsent(k, __ ->
new TopicMetricBean());
- topicMetricBean.name = v.name;
- topicMetricBean.labelsAndValues = v.labelsAndValues;
- topicMetricBean.value += v.value;
- });
-
- if (isSystemCursor(name) ||
name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
- stats.bytesOutInternalCounter += subStats.bytesOutCounter;
- }
- });
-
replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.computeStats();
@@ -2692,21 +2665,52 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return compactionRecord;
});
- if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize
!= 0) {
-
ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e)
-> {
- if (e != null) {
- log.error("[{}] Failed to get earliest message publish
time in backlog", topic, e);
- statsFuture.completeExceptionally(e);
- } else {
- stats.earliestMsgPublishTimeInBacklogs = earliestTime;
- statsFuture.complete(stats);
- }
- });
- } else {
- statsFuture.complete(stats);
- }
+ Map<String, CompletableFuture<SubscriptionStatsImpl>>
subscriptionFutures = new HashMap<>();
+ subscriptions.forEach((name, subscription) -> {
+ subscriptionFutures.put(name,
subscription.getStatsAsync(getStatsOptions));
+ });
+ return
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
+ for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e
: subscriptionFutures.entrySet()) {
+ String name = e.getKey();
+ SubscriptionStatsImpl subStats = e.getValue().join();
+ stats.msgRateOut += subStats.msgRateOut;
+ stats.msgThroughputOut += subStats.msgThroughputOut;
+ stats.bytesOutCounter += subStats.bytesOutCounter;
+ stats.msgOutCounter += subStats.msgOutCounter;
+ stats.subscriptions.put(name, subStats);
+ stats.nonContiguousDeletedMessagesRanges +=
subStats.nonContiguousDeletedMessagesRanges;
+ stats.nonContiguousDeletedMessagesRangesSerializedSize +=
+
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
+ stats.delayedMessageIndexSizeInBytes +=
subStats.delayedMessageIndexSizeInBytes;
+
+ subStats.bucketDelayedIndexStats.forEach((k, v) -> {
+ TopicMetricBean topicMetricBean =
+ stats.bucketDelayedIndexStats.computeIfAbsent(k,
ignore2 -> new TopicMetricBean());
+ topicMetricBean.name = v.name;
+ topicMetricBean.labelsAndValues = v.labelsAndValues;
+ topicMetricBean.value += v.value;
+ });
- return statsFuture;
+ if (isSystemCursor(name) ||
name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
+ stats.bytesOutInternalCounter += subStats.bytesOutCounter;
+ }
+ }
+ if (getStatsOptions.isGetEarliestTimeInBacklog() &&
stats.backlogSize != 0) {
+ CompletableFuture finalRes =
ledger.getEarliestMessagePublishTimeInBacklog()
+ .thenApply((earliestTime) -> {
+ stats.earliestMsgPublishTimeInBacklogs = earliestTime;
+ return stats;
+ });
+ // print error log.
+ finalRes.exceptionally(ex -> {
+ log.error("[{}] Failed to get earliest message publish
time in backlog", topic, ex);
+ return null;
+ });
+ return finalRes;
+ } else {
+ return CompletableFuture.completedFuture(stats);
+ }
+ });
}
private Optional<CompactorMXBean> getCompactorMXBean() {