This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 228cbfe6740887d36a09582d54da2d621e584b92 Author: frankxieke <[email protected]> AuthorDate: Tue Aug 3 08:52:20 2021 +0800 Add offload ledger info for admin topics stats (#11465) ### Motivation Currently, we don't have offload stats when getting the topic stats bin/pulsar-admin topics stats. We should add metrics in topic stats on last offloaded ledger id, last successful offloaded timestamp, and last offload failure timestamp. ### Modifications Add lastOffloadedLedgerId, lastOffloadSuccessTimestamp, lastOffloadFailureTimestamp for ManagedLedgerImpl and TopicStatsImpl. (cherry picked from commit f1f0add759fb051c6bd0b92be3a43081288dabde) --- .../apache/bookkeeper/mledger/ManagedLedger.java | 21 +++++++++++++++++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 ++++++++++++++++++++-- .../broker/service/persistent/PersistentTopic.java | 3 +++ .../common/policies/data/stats/TopicStatsImpl.java | 12 ++++++++++ .../policies/data/PersistentTopicStatsTest.java | 3 +++ .../offload/jcloud/impl/MockManagedLedger.java | 15 +++++++++++++ 6 files changed, 78 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index b8293a3..74c67e9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -393,6 +393,27 @@ public interface ManagedLedger { */ long getOffloadedSize(); + /** + * Get last offloaded ledgerId. If no offloaded yet, it returns 0. + * + * @return last offloaded ledgerId + */ + long getLastOffloadedLedgerId(); + + /** + * Get last suceessful offloaded timestamp. If no successful offload, it returns 0. + * + * @return last successful offloaded timestamp + */ + long getLastOffloadedSuccessTimestamp(); + + /** + * Get last failed offloaded timestamp. If no failed offload, it returns 0. + * + * @return last failed offloaded timestamp + */ + long getLastOffloadedFailureTimestamp(); + void asyncTerminate(TerminateCallback callback, Object ctx); /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3dcef1a..fdf8a95 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -207,6 +207,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private long lastLedgerCreationFailureTimestamp = 0; private long lastLedgerCreationInitiationTimestamp = 0; + private long lastOffloadLedgerId = 0; + private long lastOffloadSuccessTimestamp = 0; + private long lastOffloadFailureTimestamp = 0; + private static final Random random = new Random(System.currentTimeMillis()); private long maximumRolloverTimeMs; protected final Supplier<Boolean> mlOwnershipChecker; @@ -2873,8 +2877,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }) .whenComplete((ignore, exception) -> { if (exception != null) { - log.warn("[{}] Exception occurred during offload", name, exception); - + lastOffloadFailureTimestamp = System.currentTimeMillis(); + log.warn("[{}] Exception occurred for ledgerId {} timestamp {} during offload", name, ledgerId, lastOffloadFailureTimestamp, exception); PositionImpl newFirstUnoffloaded = PositionImpl.get(ledgerId, 0); if (newFirstUnoffloaded.compareTo(firstUnoffloaded) > 0) { newFirstUnoffloaded = firstUnoffloaded; @@ -2891,6 +2895,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { newFirstUnoffloaded, errorToReport); } else { + lastOffloadSuccessTimestamp = System.currentTimeMillis(); + log.info("[{}] offload for ledgerId {} timestamp {} succeed", name, ledgerId, lastOffloadSuccessTimestamp); + lastOffloadLedgerId = ledgerId; invalidateReadHandle(ledgerId); offloadLoop(promise, ledgersToOffload, firstUnoffloaded, firstError); } @@ -3725,6 +3732,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override + public long getLastOffloadedLedgerId() { + return lastOffloadLedgerId; + } + + @Override + public long getLastOffloadedSuccessTimestamp() { + return lastOffloadSuccessTimestamp; + } + + @Override + public long getLastOffloadedFailureTimestamp() { + return lastOffloadFailureTimestamp; + } + + @Override public Map<String, String> getProperties() { return propertiesMap; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2359384..40ed2a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1806,6 +1806,9 @@ public class PersistentTopic extends AbstractTopic stats.deduplicationStatus = messageDeduplication.getStatus().toString(); stats.topicEpoch = topicEpoch.orElse(null); stats.offloadedStorageSize = ledger.getOffloadedSize(); + stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId(); + stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp(); + stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp(); return stats; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index ec97d6e..7ad8e83 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -80,6 +80,15 @@ public class TopicStatsImpl implements TopicStats { /** Space used to store the offloaded messages for the topic/. */ public long offloadedStorageSize; + /** record last successful offloaded ledgerId. If no offload ledger, the value should be 0 */ + public long lastOffloadLedgerId; + + /** record last successful offloaded timestamp. If no successful offload, the value should be 0 */ + public long lastOffloadSuccessTimeStamp; + + /** record last failed offloaded timestamp. If no failed offload, the value should be 0 */ + public long lastOffloadFailureTimeStamp; + /** List of connected publishers on this topic w/ their stats. */ @Getter(AccessLevel.NONE) public List<PublisherStatsImpl> publishers; @@ -145,6 +154,9 @@ public class TopicStatsImpl implements TopicStats { this.nonContiguousDeletedMessagesRanges = 0; this.nonContiguousDeletedMessagesRangesSerializedSize = 0; this.offloadedStorageSize = 0; + this.lastOffloadLedgerId = 0; + this.lastOffloadFailureTimeStamp = 0; + this.lastOffloadSuccessTimeStamp = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java index 6b9e1b4..fa67fb0 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java @@ -53,6 +53,9 @@ public class PersistentTopicStatsTest { assertEquals(topicStats.msgThroughputOut, 1.0); assertEquals(topicStats.averageMsgSize, 1.0); assertEquals(topicStats.offloadedStorageSize, 1); + assertEquals(topicStats.lastOffloadLedgerId, 0); + assertEquals(topicStats.lastOffloadSuccessTimeStamp, 0); + assertEquals(topicStats.lastOffloadFailureTimeStamp, 0); assertEquals(topicStats.storageSize, 1); assertEquals(topicStats.publishers.size(), 1); assertEquals(topicStats.subscriptions.size(), 1); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 11b08c3d..767190c 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -190,6 +190,21 @@ public class MockManagedLedger implements ManagedLedger { } @Override + public long getLastOffloadedLedgerId() { + return 0; + } + + @Override + public long getLastOffloadedSuccessTimestamp() { + return 0; + } + + @Override + public long getLastOffloadedFailureTimestamp() { + return 0; + } + + @Override public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) { }
