This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c27b25c242c [improve][broker][pip-431] PIP-431: Add Creation and Last 
Publish Timestamps to Topic Stats (#24471)
c27b25c242c is described below

commit c27b25c242c027d03813754b9c256a601b744013
Author: Penghui Li <[email protected]>
AuthorDate: Mon Jul 7 19:28:56 2025 -0700

    [improve][broker][pip-431] PIP-431: Add Creation and Last Publish 
Timestamps to Topic Stats (#24471)
    
    (cherry picked from commit de9c7286247c16553fe69e7c99bc7b3cf2830336)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   | 18 +++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++
 .../broker/service/persistent/PersistentTopic.java | 89 +++++++++++++++++++++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 77 +++++++++++++++++++
 .../pulsar/common/policies/data/TopicStats.java    | 19 +++++
 .../common/policies/data/stats/TopicStatsImpl.java | 33 ++++++++
 6 files changed, 243 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index acaa21fcda0..16b31b979cf 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -752,4 +752,22 @@ public interface ManagedLedger {
     }
 
     Position getFirstPosition();
+
+    /**
+     * Get the timestamp in milliseconds of the last successful add entry 
operation.
+     *
+     * @return the last add entry time in milliseconds
+     */
+    default long getLastAddEntryTime() {
+        return 0;
+    }
+
+    /**
+     * Get the creation timestamp of the managed ledger metadata, or 0 if not 
available.
+     *
+     * @return the creation timestamp in milliseconds, or 0 if not available
+     */
+    default long getMetadataCreationTimestamp() {
+        return 0;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cff90b38c4d..6a7844713d5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4944,4 +4944,14 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return ManagedLedgerImplUtils
                 .asyncGetLastValidPosition(this, predicate, startPosition);
     }
+
+    @Override
+    public long getLastAddEntryTime() {
+        return lastAddEntryTimeMs;
+    }
+
+    @Override
+    public long getMetadataCreationTimestamp() {
+        return ledgersStat != null ? ledgersStat.getCreationTimestamp() : 0;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7a8c047b244..b79b2973f2e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2880,6 +2880,25 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             ? null
             : oldestPositionInfo.getCursorName();
 
+        // Set the last publish timestamp using a hybrid approach:
+        // 1. First try ledger.getLastAddEntryTime() if available
+        // 2. If needed, read the last message to get actual publish time
+        long ledgerLastAddTime = ledger.getLastAddEntryTime();
+        CompletableFuture<Long> lastPublishTimeFuture;
+
+        if (ledgerLastAddTime > 0) {
+            // Use ledger's last add time as a good approximation
+            stats.lastPublishTimeStamp = ledgerLastAddTime;
+            lastPublishTimeFuture = 
CompletableFuture.completedFuture(ledgerLastAddTime);
+        } else {
+            // Fallback to reading the last message to get actual publish time
+            stats.lastPublishTimeStamp = 0; // Will be updated below if we can 
read the message
+            lastPublishTimeFuture = getLastMessagePublishTime();
+        }
+
+        // Set the topic creation timestamp - get it directly since it's 
synchronous
+        stats.topicCreationTimeStamp = getTopicCreationTimeStamp();
+
         stats.compaction.reset();
         mxBean.flatMap(bean -> 
bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
             stats.compaction.lastCompactionRemovedEventCount = 
compactionRecord.getLastCompactionRemovedEventCount();
@@ -2894,7 +2913,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         subscriptions.forEach((name, subscription) -> {
             subscriptionFutures.put(name, 
subscription.getStatsAsync(getStatsOptions));
         });
-        return 
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
+
+        // Combine all async operations: last publish time and subscription 
stats
+        CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
+            lastPublishTimeFuture.thenAccept(time -> 
stats.lastPublishTimeStamp = time)
+        );
+
+        return combinedFutures.thenCompose(ignore ->
+            
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore2 -> {
             for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e 
: subscriptionFutures.entrySet()) {
                 String name = e.getKey();
                 SubscriptionStatsImpl subStats = e.getValue().join();
@@ -2910,7 +2936,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
                 subStats.bucketDelayedIndexStats.forEach((k, v) -> {
                     TopicMetricBean topicMetricBean =
-                            stats.bucketDelayedIndexStats.computeIfAbsent(k, 
ignore2 -> new TopicMetricBean());
+                            stats.bucketDelayedIndexStats.computeIfAbsent(k, 
key -> new TopicMetricBean());
                     topicMetricBean.name = v.name;
                     topicMetricBean.labelsAndValues = v.labelsAndValues;
                     topicMetricBean.value += v.value;
@@ -2935,7 +2961,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             } else {
                 return CompletableFuture.completedFuture(stats);
             }
-        });
+        }));
     }
 
     private Optional<CompactorMXBean> getCompactorMXBean() {
@@ -4770,4 +4796,61 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
                 old -> old != null ? old : new 
PersistentTopicAttributes(TopicName.get(topic)));
     }
+
+    /**
+     * Get the topic creation timestamp from the managed ledger metadata.
+     * This method retrieves the creation timestamp directly.
+     *
+     * @return the topic creation timestamp in milliseconds, or 0 if not 
available
+     */
+    public long getTopicCreationTimeStamp() {
+        // Get the creation timestamp from the managed ledger metadata
+        return ledger.getMetadataCreationTimestamp();
+    }
+
+    /**
+     * Get the publish time of the last message by reading the last entry.
+     * This is used as a fallback when ledger.getLastAddEntryTime() is not 
available.
+     *
+     * @return a CompletableFuture that completes with the last message 
publish time
+     */
+    private CompletableFuture<Long> getLastMessagePublishTime() {
+        CompletableFuture<Long> future = new CompletableFuture<>();
+
+        try {
+            Position lastPosition = ledger.getLastConfirmedEntry();
+            if (lastPosition == null) {
+                future.complete(0L);
+                return future;
+            }
+
+            ledger.asyncReadEntry(lastPosition, new 
AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    try {
+                        ByteBuf metadataAndPayload = entry.getDataBuffer();
+                        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+                        long publishTime = msgMetadata.getPublishTime();
+                        future.complete(publishTime);
+                    } catch (Exception e) {
+                        log.warn("[{}] Failed to parse message metadata for 
last publish time", topic, e);
+                        future.complete(0L);
+                    } finally {
+                        entry.release();
+                    }
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                    log.warn("[{}] Failed to read last entry for publish 
time", topic, exception);
+                    future.complete(0L);
+                }
+            }, null);
+        } catch (Exception e) {
+            log.warn("[{}] Failed to get last position for publish time", 
topic, e);
+            future.complete(0L);
+        }
+
+        return future;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index bad7e2d691b..b7612523c80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3901,4 +3901,81 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         producer.close();
         admin.topics().deletePartitionedTopicAsync(topic, false).get();
     }
+
+    @Test
+    public void testTopicCreationAndLastPublishTimestamps() throws Exception {
+        final String topicName = "timestamp-test-topic";
+        final String partitionedTopicName = "timestamp-test-partitioned-topic";
+        final String topic = "persistent://" + defaultNamespace + "/" + 
topicName;
+        final String partitionedTopic = "persistent://" + defaultNamespace + 
"/" + partitionedTopicName;
+
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer()
+            .topic(topic)
+            .enableBatching(false)
+            .create();
+
+        admin.topicPolicies().setRetention(topic,
+                new RetentionPolicies(-1, 10));
+
+        TopicStats initialStats = admin.topics().getStats(topic);
+        assertTrue(initialStats.getTopicCreationTimeStamp() > 0);
+        assertEquals(initialStats.getLastPublishTimeStamp(), 0L);
+        producer.send("test-message".getBytes(StandardCharsets.UTF_8));
+        TopicStats statsAfterPublish = admin.topics().getStats(topic);
+        assertTrue(statsAfterPublish.getLastPublishTimeStamp() > 0);
+        assertEquals(statsAfterPublish.getTopicCreationTimeStamp(), 
initialStats.getTopicCreationTimeStamp());
+
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+        admin.topicPolicies().setRetention(partitionedTopic,
+                new RetentionPolicies(-1, 10));
+        
+        @Cleanup
+        Producer<byte[]> partitionedProducer = client.newProducer()
+            .topic(partitionedTopic)
+            .enableBatching(false)
+            .create();
+
+        TopicStats partitionedStats = 
admin.topics().getPartitionedStats(partitionedTopic, true);
+        assertTrue(partitionedStats.getTopicCreationTimeStamp() > 0);
+        
partitionedProducer.send("test-partitioned-message".getBytes(StandardCharsets.UTF_8));
+        TopicStats partitionedStatsAfterPublish = 
admin.topics().getPartitionedStats(partitionedTopic, true);
+        assertTrue(partitionedStatsAfterPublish.getLastPublishTimeStamp() > 0);
+
+        for (int i = 0; i < 3; i++) {
+            String partitionTopic = partitionedTopic + "-partition-" + i;
+            TopicStats partitionStats = 
admin.topics().getStats(partitionTopic);
+            assertTrue(partitionStats.getTopicCreationTimeStamp() > 0);
+        }
+
+        TopicStats stats1 = admin.topics().getStats(topic);
+        TopicStats stats2 = admin.topics().getStats(topic);
+        assertEquals(stats1.getTopicCreationTimeStamp(), 
stats2.getTopicCreationTimeStamp());
+        assertEquals(stats1.getLastPublishTimeStamp(), 
stats2.getLastPublishTimeStamp());
+
+        admin.topics().unload(topic);
+        admin.topics().unload(partitionedTopic);
+
+        TopicStats statsAfterReload = admin.topics().getStats(topic);
+        TopicStats partitionedStatsAfterReload = 
admin.topics().getPartitionedStats(partitionedTopic, true);
+
+        assertTrue(statsAfterReload.getTopicCreationTimeStamp() > 0);
+        assertTrue(statsAfterReload.getLastPublishTimeStamp() > 0);
+        assertTrue(partitionedStatsAfterReload.getTopicCreationTimeStamp() > 
0);
+        assertTrue(partitionedStatsAfterReload.getLastPublishTimeStamp() > 0);
+
+        admin.topics().truncate(topic);
+        admin.topics().truncate(partitionedTopic);
+
+        statsAfterReload = admin.topics().getStats(topic);
+        partitionedStatsAfterReload = 
admin.topics().getPartitionedStats(partitionedTopic, true);
+        assertTrue(statsAfterReload.getTopicCreationTimeStamp() > 0);
+        assertEquals(statsAfterReload.getLastPublishTimeStamp(), 0);
+        assertTrue(partitionedStatsAfterReload.getTopicCreationTimeStamp() > 
0);
+        assertEquals(partitionedStatsAfterReload.getLastPublishTimeStamp(), 0);
+    }
+
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index ac50763b7e0..64f10f002ba 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -121,4 +121,23 @@ public interface TopicStats {
     String getOwnerBroker();
 
     long getDelayedMessageIndexSizeInBytes();
+
+    /**
+     * Get the topic creation timestamp in epoch milliseconds.
+     * This value represents when the topic was first durably created in the 
metadata store.
+     * This value is immutable for the lifetime of the topic.
+     *
+     * @return the topic creation timestamp in epoch milliseconds, or 0 if not 
available
+     */
+    long getTopicCreationTimeStamp();
+
+    /**
+     * Get the last publish timestamp in epoch milliseconds.
+     * This value represents the publish_time field of the last message 
successfully persisted by the broker
+     * for this topic.
+     * If no message has ever been published to the topic, this field will 
return 0.
+     *
+     * @return the last publish timestamp in epoch milliseconds, or 0 if no 
messages have been published
+     */
+    long getLastPublishTimeStamp();
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 022fffd3a7e..523ec5ac87d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -180,6 +180,12 @@ public class TopicStatsImpl implements TopicStats {
     /** The broker that owns this topic. */
     public String ownerBroker;
 
+    /** The topic creation timestamp in epoch milliseconds. */
+    public long topicCreationTimeStamp;
+
+    /** The last publish timestamp in epoch milliseconds. */
+    public long lastPublishTimeStamp;
+
     public List<? extends PublisherStats> getPublishers() {
         return Stream.concat(publishers.stream().sorted(
                                 
Comparator.comparing(PublisherStatsImpl::getProducerName, 
nullsLast(naturalOrder()))),
@@ -256,6 +262,8 @@ public class TopicStatsImpl implements TopicStats {
         this.backlogQuotaLimitTime = 0;
         this.oldestBacklogMessageAgeSeconds = -1;
         this.oldestBacklogMessageSubscriptionName = null;
+        this.topicCreationTimeStamp = 0;
+        this.lastPublishTimeStamp = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
@@ -292,6 +300,16 @@ public class TopicStatsImpl implements TopicStats {
             this.oldestBacklogMessageSubscriptionName = 
stats.oldestBacklogMessageSubscriptionName;
         }
 
+        // Handle topicCreationTimeStamp - use the earliest (minimum) value
+        if (this.topicCreationTimeStamp != 0 && stats.topicCreationTimeStamp 
!= 0) {
+            this.topicCreationTimeStamp = 
Math.min(this.topicCreationTimeStamp, stats.topicCreationTimeStamp);
+        } else {
+            this.topicCreationTimeStamp = 
Math.max(this.topicCreationTimeStamp, stats.topicCreationTimeStamp);
+        }
+
+        // Handle lastPublishTimeStamp - use the latest (maximum) value
+        this.lastPublishTimeStamp = Math.max(this.lastPublishTimeStamp, 
stats.lastPublishTimeStamp);
+
         stats.bucketDelayedIndexStats.forEach((k, v) -> {
             TopicMetricBean topicMetricBean =
                     this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new 
TopicMetricBean());
@@ -355,4 +373,19 @@ public class TopicStatsImpl implements TopicStats {
         }
         return this;
     }
+
+    @Override
+    public long getTopicCreationTimeStamp() {
+        return topicCreationTimeStamp;
+    }
+
+    @Override
+    public long getLastPublishTimeStamp() {
+        return lastPublishTimeStamp;
+    }
+
+    @Override
+    public long getDelayedMessageIndexSizeInBytes() {
+        return delayedMessageIndexSizeInBytes;
+    }
 }

Reply via email to