This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 228cbfe6740887d36a09582d54da2d621e584b92
Author: frankxieke <[email protected]>
AuthorDate: Tue Aug 3 08:52:20 2021 +0800

    Add offload ledger info for admin topics stats (#11465)
    
    ### Motivation
    Currently, we don't have offload stats when getting the topic stats 
bin/pulsar-admin topics stats.
    We should add metrics in topic stats on last offloaded ledger id, last 
successful offloaded timestamp, and last offload failure timestamp.
    
    ### Modifications
    Add lastOffloadedLedgerId, lastOffloadSuccessTimestamp, 
lastOffloadFailureTimestamp for ManagedLedgerImpl and TopicStatsImpl.
    
    (cherry picked from commit f1f0add759fb051c6bd0b92be3a43081288dabde)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   | 21 +++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 ++++++++++++++++++++--
 .../broker/service/persistent/PersistentTopic.java |  3 +++
 .../common/policies/data/stats/TopicStatsImpl.java | 12 ++++++++++
 .../policies/data/PersistentTopicStatsTest.java    |  3 +++
 .../offload/jcloud/impl/MockManagedLedger.java     | 15 +++++++++++++
 6 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index b8293a3..74c67e9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -393,6 +393,27 @@ public interface ManagedLedger {
      */
     long getOffloadedSize();
 
+    /**
+     * Get last offloaded ledgerId. If no offloaded yet, it returns 0.
+     *
+     * @return last offloaded ledgerId
+     */
+    long getLastOffloadedLedgerId();
+
+    /**
+    * Get last suceessful offloaded timestamp. If no successful offload, it 
returns 0.
+     *
+     * @return last successful offloaded timestamp
+     */
+    long getLastOffloadedSuccessTimestamp();
+
+    /**
+     * Get last failed offloaded timestamp. If no failed offload, it returns 0.
+     *
+     * @return last failed offloaded timestamp
+     */
+    long getLastOffloadedFailureTimestamp();
+
     void asyncTerminate(TerminateCallback callback, Object ctx);
 
     /**
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3dcef1a..fdf8a95 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -207,6 +207,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private long lastLedgerCreationFailureTimestamp = 0;
     private long lastLedgerCreationInitiationTimestamp = 0;
 
+    private long lastOffloadLedgerId = 0;
+    private long lastOffloadSuccessTimestamp = 0;
+    private long lastOffloadFailureTimestamp = 0;
+
     private static final Random random = new 
Random(System.currentTimeMillis());
     private long maximumRolloverTimeMs;
     protected final Supplier<Boolean> mlOwnershipChecker;
@@ -2873,8 +2877,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     })
                 .whenComplete((ignore, exception) -> {
                         if (exception != null) {
-                            log.warn("[{}] Exception occurred during offload", 
name, exception);
-
+                            lastOffloadFailureTimestamp = 
System.currentTimeMillis();
+                            log.warn("[{}] Exception occurred for ledgerId {} 
timestamp {} during offload", name, ledgerId, lastOffloadFailureTimestamp, 
exception);
                             PositionImpl newFirstUnoffloaded = 
PositionImpl.get(ledgerId, 0);
                             if 
(newFirstUnoffloaded.compareTo(firstUnoffloaded) > 0) {
                                 newFirstUnoffloaded = firstUnoffloaded;
@@ -2891,6 +2895,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                         newFirstUnoffloaded,
                                         errorToReport);
                         } else {
+                            lastOffloadSuccessTimestamp = 
System.currentTimeMillis();
+                            log.info("[{}] offload for ledgerId {} timestamp 
{} succeed", name, ledgerId, lastOffloadSuccessTimestamp);
+                            lastOffloadLedgerId = ledgerId;
                             invalidateReadHandle(ledgerId);
                             offloadLoop(promise, ledgersToOffload, 
firstUnoffloaded, firstError);
                         }
@@ -3725,6 +3732,21 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     @Override
+    public long getLastOffloadedLedgerId() {
+        return lastOffloadLedgerId;
+    }
+
+    @Override
+    public long getLastOffloadedSuccessTimestamp() {
+        return lastOffloadSuccessTimestamp;
+    }
+
+    @Override
+    public long getLastOffloadedFailureTimestamp() {
+        return lastOffloadFailureTimestamp;
+    }
+
+    @Override
     public Map<String, String> getProperties() {
         return propertiesMap;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2359384..40ed2a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1806,6 +1806,9 @@ public class PersistentTopic extends AbstractTopic
         stats.deduplicationStatus = 
messageDeduplication.getStatus().toString();
         stats.topicEpoch = topicEpoch.orElse(null);
         stats.offloadedStorageSize = ledger.getOffloadedSize();
+        stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
+        stats.lastOffloadSuccessTimeStamp = 
ledger.getLastOffloadedSuccessTimestamp();
+        stats.lastOffloadFailureTimeStamp = 
ledger.getLastOffloadedFailureTimestamp();
         return stats;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index ec97d6e..7ad8e83 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -80,6 +80,15 @@ public class TopicStatsImpl implements TopicStats {
     /** Space used to store the offloaded messages for the topic/. */
     public long offloadedStorageSize;
 
+    /** record last successful offloaded ledgerId. If no offload ledger, the 
value should be 0 */
+    public long lastOffloadLedgerId;
+
+    /** record last successful offloaded timestamp. If no successful offload, 
the value should be 0 */
+    public long lastOffloadSuccessTimeStamp;
+
+    /** record last failed offloaded timestamp. If no failed offload, the 
value should be 0 */
+    public long lastOffloadFailureTimeStamp;
+
     /** List of connected publishers on this topic w/ their stats. */
     @Getter(AccessLevel.NONE)
     public List<PublisherStatsImpl> publishers;
@@ -145,6 +154,9 @@ public class TopicStatsImpl implements TopicStats {
         this.nonContiguousDeletedMessagesRanges = 0;
         this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
         this.offloadedStorageSize = 0;
+        this.lastOffloadLedgerId = 0;
+        this.lastOffloadFailureTimeStamp = 0;
+        this.lastOffloadSuccessTimeStamp = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index 6b9e1b4..fa67fb0 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -53,6 +53,9 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.msgThroughputOut, 1.0);
         assertEquals(topicStats.averageMsgSize, 1.0);
         assertEquals(topicStats.offloadedStorageSize, 1);
+        assertEquals(topicStats.lastOffloadLedgerId, 0);
+        assertEquals(topicStats.lastOffloadSuccessTimeStamp, 0);
+        assertEquals(topicStats.lastOffloadFailureTimeStamp, 0);
         assertEquals(topicStats.storageSize, 1);
         assertEquals(topicStats.publishers.size(), 1);
         assertEquals(topicStats.subscriptions.size(), 1);
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 11b08c3d..767190c 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -190,6 +190,21 @@ public class MockManagedLedger implements ManagedLedger {
     }
 
     @Override
+    public long getLastOffloadedLedgerId() {
+        return 0;
+    }
+
+    @Override
+    public long getLastOffloadedSuccessTimestamp() {
+        return 0;
+    }
+
+    @Override
+    public long getLastOffloadedFailureTimestamp() {
+        return 0;
+    }
+
+    @Override
     public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, 
Object ctx) {
 
     }

Reply via email to