This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c27b25c242c [improve][broker][pip-431] PIP-431: Add Creation and Last
Publish Timestamps to Topic Stats (#24471)
c27b25c242c is described below
commit c27b25c242c027d03813754b9c256a601b744013
Author: Penghui Li <[email protected]>
AuthorDate: Mon Jul 7 19:28:56 2025 -0700
[improve][broker][pip-431] PIP-431: Add Creation and Last Publish
Timestamps to Topic Stats (#24471)
(cherry picked from commit de9c7286247c16553fe69e7c99bc7b3cf2830336)
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 18 +++++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 +++
.../broker/service/persistent/PersistentTopic.java | 89 +++++++++++++++++++++-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 77 +++++++++++++++++++
.../pulsar/common/policies/data/TopicStats.java | 19 +++++
.../common/policies/data/stats/TopicStatsImpl.java | 33 ++++++++
6 files changed, 243 insertions(+), 3 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index acaa21fcda0..16b31b979cf 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -752,4 +752,22 @@ public interface ManagedLedger {
}
Position getFirstPosition();
+
+ /**
+ * Get the timestamp in milliseconds of the last successful add entry
operation.
+ *
+ * @return the last add entry time in milliseconds
+ */
+ default long getLastAddEntryTime() {
+ return 0;
+ }
+
+ /**
+ * Get the creation timestamp of the managed ledger metadata, or 0 if not
available.
+ *
+ * @return the creation timestamp in milliseconds, or 0 if not available
+ */
+ default long getMetadataCreationTimestamp() {
+ return 0;
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cff90b38c4d..6a7844713d5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4944,4 +4944,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return ManagedLedgerImplUtils
.asyncGetLastValidPosition(this, predicate, startPosition);
}
+
+ @Override
+ public long getLastAddEntryTime() {
+ return lastAddEntryTimeMs;
+ }
+
+ @Override
+ public long getMetadataCreationTimestamp() {
+ return ledgersStat != null ? ledgersStat.getCreationTimestamp() : 0;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7a8c047b244..b79b2973f2e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2880,6 +2880,25 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
? null
: oldestPositionInfo.getCursorName();
+ // Set the last publish timestamp using a hybrid approach:
+ // 1. First try ledger.getLastAddEntryTime() if available
+ // 2. If needed, read the last message to get actual publish time
+ long ledgerLastAddTime = ledger.getLastAddEntryTime();
+ CompletableFuture<Long> lastPublishTimeFuture;
+
+ if (ledgerLastAddTime > 0) {
+ // Use ledger's last add time as a good approximation
+ stats.lastPublishTimeStamp = ledgerLastAddTime;
+ lastPublishTimeFuture =
CompletableFuture.completedFuture(ledgerLastAddTime);
+ } else {
+ // Fallback to reading the last message to get actual publish time
+ stats.lastPublishTimeStamp = 0; // Will be updated below if we can
read the message
+ lastPublishTimeFuture = getLastMessagePublishTime();
+ }
+
+ // Set the topic creation timestamp - get it directly since it's
synchronous
+ stats.topicCreationTimeStamp = getTopicCreationTimeStamp();
+
stats.compaction.reset();
mxBean.flatMap(bean ->
bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
stats.compaction.lastCompactionRemovedEventCount =
compactionRecord.getLastCompactionRemovedEventCount();
@@ -2894,7 +2913,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
subscriptions.forEach((name, subscription) -> {
subscriptionFutures.put(name,
subscription.getStatsAsync(getStatsOptions));
});
- return
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
+
+ // Combine all async operations: last publish time and subscription
stats
+ CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
+ lastPublishTimeFuture.thenAccept(time ->
stats.lastPublishTimeStamp = time)
+ );
+
+ return combinedFutures.thenCompose(ignore ->
+
FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore2 -> {
for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e
: subscriptionFutures.entrySet()) {
String name = e.getKey();
SubscriptionStatsImpl subStats = e.getValue().join();
@@ -2910,7 +2936,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
subStats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
- stats.bucketDelayedIndexStats.computeIfAbsent(k,
ignore2 -> new TopicMetricBean());
+ stats.bucketDelayedIndexStats.computeIfAbsent(k,
key -> new TopicMetricBean());
topicMetricBean.name = v.name;
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
@@ -2935,7 +2961,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
return CompletableFuture.completedFuture(stats);
}
- });
+ }));
}
private Optional<CompactorMXBean> getCompactorMXBean() {
@@ -4770,4 +4796,61 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
old -> old != null ? old : new
PersistentTopicAttributes(TopicName.get(topic)));
}
+
+ /**
+ * Get the topic creation timestamp from the managed ledger metadata.
+ * This method retrieves the creation timestamp directly.
+ *
+ * @return the topic creation timestamp in milliseconds, or 0 if not
available
+ */
+ public long getTopicCreationTimeStamp() {
+ // Get the creation timestamp from the managed ledger metadata
+ return ledger.getMetadataCreationTimestamp();
+ }
+
+ /**
+ * Get the publish time of the last message by reading the last entry.
+ * This is used as a fallback when ledger.getLastAddEntryTime() is not
available.
+ *
+ * @return a CompletableFuture that completes with the last message
publish time
+ */
+ private CompletableFuture<Long> getLastMessagePublishTime() {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+
+ try {
+ Position lastPosition = ledger.getLastConfirmedEntry();
+ if (lastPosition == null) {
+ future.complete(0L);
+ return future;
+ }
+
+ ledger.asyncReadEntry(lastPosition, new
AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ try {
+ ByteBuf metadataAndPayload = entry.getDataBuffer();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ long publishTime = msgMetadata.getPublishTime();
+ future.complete(publishTime);
+ } catch (Exception e) {
+ log.warn("[{}] Failed to parse message metadata for
last publish time", topic, e);
+ future.complete(0L);
+ } finally {
+ entry.release();
+ }
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ log.warn("[{}] Failed to read last entry for publish
time", topic, exception);
+ future.complete(0L);
+ }
+ }, null);
+ } catch (Exception e) {
+ log.warn("[{}] Failed to get last position for publish time",
topic, e);
+ future.complete(0L);
+ }
+
+ return future;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index bad7e2d691b..b7612523c80 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3901,4 +3901,81 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
producer.close();
admin.topics().deletePartitionedTopicAsync(topic, false).get();
}
+
+ @Test
+ public void testTopicCreationAndLastPublishTimestamps() throws Exception {
+ final String topicName = "timestamp-test-topic";
+ final String partitionedTopicName = "timestamp-test-partitioned-topic";
+ final String topic = "persistent://" + defaultNamespace + "/" +
topicName;
+ final String partitionedTopic = "persistent://" + defaultNamespace +
"/" + partitionedTopicName;
+
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ admin.topicPolicies().setRetention(topic,
+ new RetentionPolicies(-1, 10));
+
+ TopicStats initialStats = admin.topics().getStats(topic);
+ assertTrue(initialStats.getTopicCreationTimeStamp() > 0);
+ assertEquals(initialStats.getLastPublishTimeStamp(), 0L);
+ producer.send("test-message".getBytes(StandardCharsets.UTF_8));
+ TopicStats statsAfterPublish = admin.topics().getStats(topic);
+ assertTrue(statsAfterPublish.getLastPublishTimeStamp() > 0);
+ assertEquals(statsAfterPublish.getTopicCreationTimeStamp(),
initialStats.getTopicCreationTimeStamp());
+
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+ admin.topicPolicies().setRetention(partitionedTopic,
+ new RetentionPolicies(-1, 10));
+
+ @Cleanup
+ Producer<byte[]> partitionedProducer = client.newProducer()
+ .topic(partitionedTopic)
+ .enableBatching(false)
+ .create();
+
+ TopicStats partitionedStats =
admin.topics().getPartitionedStats(partitionedTopic, true);
+ assertTrue(partitionedStats.getTopicCreationTimeStamp() > 0);
+
partitionedProducer.send("test-partitioned-message".getBytes(StandardCharsets.UTF_8));
+ TopicStats partitionedStatsAfterPublish =
admin.topics().getPartitionedStats(partitionedTopic, true);
+ assertTrue(partitionedStatsAfterPublish.getLastPublishTimeStamp() > 0);
+
+ for (int i = 0; i < 3; i++) {
+ String partitionTopic = partitionedTopic + "-partition-" + i;
+ TopicStats partitionStats =
admin.topics().getStats(partitionTopic);
+ assertTrue(partitionStats.getTopicCreationTimeStamp() > 0);
+ }
+
+ TopicStats stats1 = admin.topics().getStats(topic);
+ TopicStats stats2 = admin.topics().getStats(topic);
+ assertEquals(stats1.getTopicCreationTimeStamp(),
stats2.getTopicCreationTimeStamp());
+ assertEquals(stats1.getLastPublishTimeStamp(),
stats2.getLastPublishTimeStamp());
+
+ admin.topics().unload(topic);
+ admin.topics().unload(partitionedTopic);
+
+ TopicStats statsAfterReload = admin.topics().getStats(topic);
+ TopicStats partitionedStatsAfterReload =
admin.topics().getPartitionedStats(partitionedTopic, true);
+
+ assertTrue(statsAfterReload.getTopicCreationTimeStamp() > 0);
+ assertTrue(statsAfterReload.getLastPublishTimeStamp() > 0);
+ assertTrue(partitionedStatsAfterReload.getTopicCreationTimeStamp() >
0);
+ assertTrue(partitionedStatsAfterReload.getLastPublishTimeStamp() > 0);
+
+ admin.topics().truncate(topic);
+ admin.topics().truncate(partitionedTopic);
+
+ statsAfterReload = admin.topics().getStats(topic);
+ partitionedStatsAfterReload =
admin.topics().getPartitionedStats(partitionedTopic, true);
+ assertTrue(statsAfterReload.getTopicCreationTimeStamp() > 0);
+ assertEquals(statsAfterReload.getLastPublishTimeStamp(), 0);
+ assertTrue(partitionedStatsAfterReload.getTopicCreationTimeStamp() >
0);
+ assertEquals(partitionedStatsAfterReload.getLastPublishTimeStamp(), 0);
+ }
+
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index ac50763b7e0..64f10f002ba 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -121,4 +121,23 @@ public interface TopicStats {
String getOwnerBroker();
long getDelayedMessageIndexSizeInBytes();
+
+ /**
+ * Get the topic creation timestamp in epoch milliseconds.
+ * This value represents when the topic was first durably created in the
metadata store.
+ * This value is immutable for the lifetime of the topic.
+ *
+ * @return the topic creation timestamp in epoch milliseconds, or 0 if not
available
+ */
+ long getTopicCreationTimeStamp();
+
+ /**
+ * Get the last publish timestamp in epoch milliseconds.
+ * This value represents the publish_time field of the last message
successfully persisted by the broker
+ * for this topic.
+ * If no message has ever been published to the topic, this field will
return 0.
+ *
+ * @return the last publish timestamp in epoch milliseconds, or 0 if no
messages have been published
+ */
+ long getLastPublishTimeStamp();
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 022fffd3a7e..523ec5ac87d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -180,6 +180,12 @@ public class TopicStatsImpl implements TopicStats {
/** The broker that owns this topic. */
public String ownerBroker;
+ /** The topic creation timestamp in epoch milliseconds. */
+ public long topicCreationTimeStamp;
+
+ /** The last publish timestamp in epoch milliseconds. */
+ public long lastPublishTimeStamp;
+
public List<? extends PublisherStats> getPublishers() {
return Stream.concat(publishers.stream().sorted(
Comparator.comparing(PublisherStatsImpl::getProducerName,
nullsLast(naturalOrder()))),
@@ -256,6 +262,8 @@ public class TopicStatsImpl implements TopicStats {
this.backlogQuotaLimitTime = 0;
this.oldestBacklogMessageAgeSeconds = -1;
this.oldestBacklogMessageSubscriptionName = null;
+ this.topicCreationTimeStamp = 0;
+ this.lastPublishTimeStamp = 0;
}
// if the stats are added for the 1st time, we will need to make a copy of
these stats and add it to the current
@@ -292,6 +300,16 @@ public class TopicStatsImpl implements TopicStats {
this.oldestBacklogMessageSubscriptionName =
stats.oldestBacklogMessageSubscriptionName;
}
+ // Handle topicCreationTimeStamp - use the earliest (minimum) value
+ if (this.topicCreationTimeStamp != 0 && stats.topicCreationTimeStamp
!= 0) {
+ this.topicCreationTimeStamp =
Math.min(this.topicCreationTimeStamp, stats.topicCreationTimeStamp);
+ } else {
+ this.topicCreationTimeStamp =
Math.max(this.topicCreationTimeStamp, stats.topicCreationTimeStamp);
+ }
+
+ // Handle lastPublishTimeStamp - use the latest (maximum) value
+ this.lastPublishTimeStamp = Math.max(this.lastPublishTimeStamp,
stats.lastPublishTimeStamp);
+
stats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new
TopicMetricBean());
@@ -355,4 +373,19 @@ public class TopicStatsImpl implements TopicStats {
}
return this;
}
+
+ @Override
+ public long getTopicCreationTimeStamp() {
+ return topicCreationTimeStamp;
+ }
+
+ @Override
+ public long getLastPublishTimeStamp() {
+ return lastPublishTimeStamp;
+ }
+
+ @Override
+ public long getDelayedMessageIndexSizeInBytes() {
+ return delayedMessageIndexSizeInBytes;
+ }
}