This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77358513290 [feat] PIP-468: record wall-clock created/sealed 
timestamps on segments (#25658)
77358513290 is described below

commit 7735851329088d0e9ca1553989c503cde47dfb76
Author: Matteo Merli <[email protected]>
AuthorDate: Mon May 4 02:06:08 2026 -0700

    [feat] PIP-468: record wall-clock created/sealed timestamps on segments 
(#25658)
---
 .../broker/service/scalable/DagWatchSession.java   |  4 ++
 .../service/scalable/ScalableTopicController.java  | 20 +++++--
 .../broker/service/scalable/SegmentLayout.java     | 23 ++++---
 .../service/scalable/DagWatchSessionTest.java      | 16 ++++-
 .../broker/service/scalable/SegmentLayoutTest.java | 70 ++++++++++++++++++----
 .../scalable/SubscriptionCoordinatorTest.java      | 12 ++--
 .../apache/pulsar/common/scalable/SegmentInfo.java | 35 +++++++----
 pulsar-common/src/main/proto/PulsarApi.proto       |  4 ++
 .../common/protocol/CommandsScalableTopicTest.java |  4 +-
 9 files changed, 143 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 1e7e87865a3..2cc50301486 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -166,6 +166,10 @@ public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListe
             if (seg.sealedAtEpoch() >= 0) {
                 segProto.setSealedAtEpoch(seg.sealedAtEpoch());
             }
+            segProto.setCreatedAtMs(seg.createdAtMs());
+            if (seg.sealedAtMs() >= 0) {
+                segProto.setSealedAtMs(seg.sealedAtMs());
+            }
         }
 
         // Add broker addresses for active segments
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 7aef354a0f8..4a7b15a1e15 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -285,8 +285,13 @@ public class ScalableTopicController {
     public CompletableFuture<SegmentLayout> splitSegment(long segmentId) {
         checkLeader();
 
+        // Single timestamp shared by the local preview and the CAS-retried 
metadata update,
+        // so the children's createdAtMs and the parent's sealedAtMs always 
agree even if the
+        // CAS retries due to concurrent writers.
+        final long nowMs = System.currentTimeMillis();
+
         // Compute the new layout locally to derive child segment info
-        SegmentLayout newLayout = currentLayout.splitSegment(segmentId);
+        SegmentLayout newLayout = currentLayout.splitSegment(segmentId, nowMs);
         SegmentInfo child1 = 
newLayout.getAllSegments().get(newLayout.getNextSegmentId() - 2);
         SegmentInfo child2 = 
newLayout.getAllSegments().get(newLayout.getNextSegmentId() - 1);
         SegmentInfo parent = currentLayout.getAllSegments().get(segmentId);
@@ -309,7 +314,7 @@ public class ScalableTopicController {
           // Step 4: Atomic metadata update (only after topics + cursors are 
ready + parent terminated)
           .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md 
-> {
               SegmentLayout latest = SegmentLayout.fromMetadata(md);
-              SegmentLayout updated = latest.splitSegment(segmentId);
+              SegmentLayout updated = latest.splitSegment(segmentId, nowMs);
               return updated.toMetadata(md.getProperties());
           }))
           .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
@@ -330,8 +335,12 @@ public class ScalableTopicController {
     public CompletableFuture<SegmentLayout> mergeSegments(long segmentId1, 
long segmentId2) {
         checkLeader();
 
+        // Single timestamp shared by the local preview and the CAS-retried 
metadata
+        // update — see splitSegment for the rationale.
+        final long nowMs = System.currentTimeMillis();
+
         // Compute the new layout locally to derive merged segment info
-        SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1, 
segmentId2);
+        SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1, 
segmentId2, nowMs);
         SegmentInfo merged = 
newLayout.getAllSegments().get(newLayout.getNextSegmentId() - 1);
         SegmentInfo parent1 = currentLayout.getAllSegments().get(segmentId1);
         SegmentInfo parent2 = currentLayout.getAllSegments().get(segmentId2);
@@ -351,7 +360,7 @@ public class ScalableTopicController {
           // Step 3: Atomic metadata update (only after topic + cursors are 
ready + parents terminated)
           .thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md 
-> {
               SegmentLayout latest = SegmentLayout.fromMetadata(md);
-              SegmentLayout updated = latest.mergeSegments(segmentId1, 
segmentId2);
+              SegmentLayout updated = latest.mergeSegments(segmentId1, 
segmentId2, nowMs);
               return updated.toMetadata(md.getProperties());
           }))
           .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
@@ -690,11 +699,12 @@ public class ScalableTopicController {
         int rangeSize = (HashRange.MAX_HASH + 1) / numInitialSegments;
         Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
 
+        long nowMs = System.currentTimeMillis();
         for (int i = 0; i < numInitialSegments; i++) {
             int start = i * rangeSize;
             int end = (i == numInitialSegments - 1) ? HashRange.MAX_HASH : 
(start + rangeSize - 1);
             HashRange range = HashRange.of(start, end);
-            SegmentInfo segment = SegmentInfo.active(i, range, 0);
+            SegmentInfo segment = SegmentInfo.active(i, range, 0, nowMs);
             segments.put((long) i, segment);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
index 5da17726b88..aed7dd3f8e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -116,9 +116,12 @@ public class SegmentLayout {
      * Produce a new layout by splitting a segment at its midpoint.
      *
      * @param segmentId the active segment to split
+     * @param nowMs     wall-clock millis used as the parent's seal time and 
the
+     *                  children's create time. Caller passes a single value so
+     *                  CAS retries and follow-up reads agree.
      * @return a new SegmentLayout with the split applied
      */
-    public SegmentLayout splitSegment(long segmentId) {
+    public SegmentLayout splitSegment(long segmentId, long nowMs) {
         SegmentInfo segment = allSegments.get(segmentId);
         if (segment == null) {
             throw new IllegalArgumentException("Segment not found: " + 
segmentId);
@@ -132,9 +135,11 @@ public class SegmentLayout {
         long childId1 = nextSegmentId;
         long childId2 = nextSegmentId + 1;
 
-        SegmentInfo sealedParent = segment.sealed(newEpoch, List.of(childId1, 
childId2));
-        SegmentInfo child1 = SegmentInfo.active(childId1, splitRanges[0], 
List.of(segmentId), newEpoch);
-        SegmentInfo child2 = SegmentInfo.active(childId2, splitRanges[1], 
List.of(segmentId), newEpoch);
+        SegmentInfo sealedParent = segment.sealed(newEpoch, nowMs, 
List.of(childId1, childId2));
+        SegmentInfo child1 = SegmentInfo.active(childId1, splitRanges[0],
+                List.of(segmentId), newEpoch, nowMs);
+        SegmentInfo child2 = SegmentInfo.active(childId2, splitRanges[1],
+                List.of(segmentId), newEpoch, nowMs);
 
         Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
         newSegments.put(segmentId, sealedParent);
@@ -149,9 +154,11 @@ public class SegmentLayout {
      *
      * @param segmentId1 the first segment (must be active and adjacent to 
segmentId2)
      * @param segmentId2 the second segment (must be active and adjacent to 
segmentId1)
+     * @param nowMs      wall-clock millis used as the parents' seal time and 
the
+     *                   merged child's create time
      * @return a new SegmentLayout with the merge applied
      */
-    public SegmentLayout mergeSegments(long segmentId1, long segmentId2) {
+    public SegmentLayout mergeSegments(long segmentId1, long segmentId2, long 
nowMs) {
         SegmentInfo seg1 = allSegments.get(segmentId1);
         SegmentInfo seg2 = allSegments.get(segmentId2);
         if (seg1 == null || seg2 == null) {
@@ -169,10 +176,10 @@ public class SegmentLayout {
         long mergedId = nextSegmentId;
         HashRange mergedRange = seg1.hashRange().merge(seg2.hashRange());
 
-        SegmentInfo sealed1 = seg1.sealed(newEpoch, List.of(mergedId));
-        SegmentInfo sealed2 = seg2.sealed(newEpoch, List.of(mergedId));
+        SegmentInfo sealed1 = seg1.sealed(newEpoch, nowMs, List.of(mergedId));
+        SegmentInfo sealed2 = seg2.sealed(newEpoch, nowMs, List.of(mergedId));
         SegmentInfo merged = SegmentInfo.active(mergedId, mergedRange,
-                List.of(segmentId1, segmentId2), newEpoch);
+                List.of(segmentId1, segmentId2), newEpoch, nowMs);
 
         Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
         newSegments.put(segmentId1, sealed1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
index cf14eb6b00e..a14ef6d5780 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -224,6 +224,11 @@ public class DagWatchSessionTest {
         assertEquals(parent.getChildIdAt(1), 3L);
         assertEquals(parent.getCreatedAtEpoch(), 0L);
         assertEquals(parent.getSealedAtEpoch(), 5L);
+        // wall-clock timestamps from seg() helper: createdAtMs = 1000 + id;
+        // sealed segments add 100ms to that.
+        assertEquals(parent.getCreatedAtMs(), 1_000L);
+        assertTrue(parent.hasSealedAtMs(), "sealed segment must carry 
sealedAtMs");
+        assertEquals(parent.getSealedAtMs(), 1_100L);
 
         // active children should reference parent 0
         var childA = findSegment(dag, 2L);
@@ -234,6 +239,9 @@ public class DagWatchSessionTest {
         // sealedAtEpoch is only written when non-negative
         assertTrue(!childA.hasSealedAtEpoch() || childA.getSealedAtEpoch() == 
0,
                 "active segment should not have sealedAtEpoch set");
+        assertEquals(childA.getCreatedAtMs(), 1_002L);
+        assertTrue(!childA.hasSealedAtMs() || childA.getSealedAtMs() == 0,
+                "active segment should not have sealedAtMs set");
 
         // broker addresses only for the 2 active segments
         assertEquals(dag.getSegmentBrokersCount(), 2);
@@ -293,6 +301,10 @@ public class DagWatchSessionTest {
     private static SegmentInfo seg(long id, int start, int end, SegmentState 
state,
                                    long[] parents, long[] children,
                                    long createdAt, long sealedAt) {
+        // wall-clock timestamps don't matter for these wire-format tests; use 
a
+        // deterministic non-zero value so the proto round-trip is observable.
+        long createdAtMs = 1_000L + id;
+        long sealedAtMs = state == SegmentState.SEALED ? createdAtMs + 100L : 
-1;
         return new SegmentInfo(
                 id,
                 HashRange.of(start, end),
@@ -300,7 +312,9 @@ public class DagWatchSessionTest {
                 toList(parents),
                 toList(children),
                 createdAt,
-                sealedAt);
+                sealedAt,
+                createdAtMs,
+                sealedAtMs);
     }
 
     private static java.util.List<Long> toList(long[] arr) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
index e25066e4f1d..b936820072f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -96,7 +96,7 @@ public class SegmentLayoutTest {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
 
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         assertEquals(afterSplit.getEpoch(), 1);
         assertEquals(afterSplit.getActiveSegments().size(), 3); // 1 original 
+ 2 new
@@ -124,10 +124,10 @@ public class SegmentLayoutTest {
     public void testSplitNonActiveSegment() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         // Segment 0 is now sealed, cannot split again
-        assertThrows(IllegalArgumentException.class, () -> 
afterSplit.splitSegment(0));
+        assertThrows(IllegalArgumentException.class, () -> 
afterSplit.splitSegment(0, 0L));
     }
 
     @Test
@@ -135,9 +135,9 @@ public class SegmentLayoutTest {
         // Start with 2 segments, split seg-0, then merge the children back
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0); // seg-2 
[0000-3fff], seg-3 [4000-7fff]
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L); // seg-2 
[0000-3fff], seg-3 [4000-7fff]
 
-        SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3);
+        SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3, 0L);
 
         assertEquals(afterMerge.getEpoch(), 2);
         assertEquals(afterMerge.getActiveSegments().size(), 2); // merged + 
seg-1
@@ -153,20 +153,64 @@ public class SegmentLayoutTest {
         assertEquals(merged.parentIds(), List.of(2L, 3L));
     }
 
+    @Test
+    public void testSplitRecordsWallClockTimestamps() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        long splitAt = 1_700_000_000_000L;
+        SegmentLayout after = layout.splitSegment(0, splitAt);
+
+        // Parent gets sealedAtMs = splitAt; createdAtMs is preserved from 
layout creation.
+        SegmentInfo parent = after.getAllSegments().get(0L);
+        assertTrue(parent.isSealed());
+        assertEquals(parent.sealedAtMs(), splitAt);
+        assertTrue(parent.createdAtMs() > 0, "createdAtMs must be set at 
create time");
+
+        // Both children get createdAtMs = splitAt and sealedAtMs = -1 
(active).
+        // initialMetadata(1) has nextSegmentId=1, so split-children are 1 and 
2.
+        SegmentInfo child1 = after.getAllSegments().get(1L);
+        SegmentInfo child2 = after.getAllSegments().get(2L);
+        assertEquals(child1.createdAtMs(), splitAt);
+        assertEquals(child2.createdAtMs(), splitAt);
+        assertEquals(child1.sealedAtMs(), -1L);
+        assertEquals(child2.sealedAtMs(), -1L);
+    }
+
+    @Test
+    public void testMergeRecordsWallClockTimestamps() {
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
+        SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+        long splitAt = 1_700_000_000_000L;
+        SegmentLayout afterSplit = layout.splitSegment(0, splitAt);
+        long mergeAt = splitAt + 60_000L;
+        SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3, mergeAt);
+
+        // Both sealed split-children carry sealedAtMs = mergeAt.
+        assertEquals(afterMerge.getAllSegments().get(2L).sealedAtMs(), 
mergeAt);
+        assertEquals(afterMerge.getAllSegments().get(3L).sealedAtMs(), 
mergeAt);
+
+        // Merged child has createdAtMs = mergeAt.
+        SegmentInfo merged = afterMerge.getAllSegments().get(4L);
+        assertEquals(merged.createdAtMs(), mergeAt);
+        assertEquals(merged.sealedAtMs(), -1L);
+    }
+
     @Test
     public void testMergeNonAdjacentSegments() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(4, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
 
         // Segments 0 and 2 are not adjacent
-        assertThrows(IllegalArgumentException.class, () -> 
layout.mergeSegments(0, 2));
+        assertThrows(IllegalArgumentException.class, () -> 
layout.mergeSegments(0, 2, 0L));
     }
 
     @Test
     public void testPruneSegment() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         // Prune sealed segment 0
         SegmentLayout afterPrune = afterSplit.pruneSegment(0);
@@ -189,7 +233,7 @@ public class SegmentLayoutTest {
     public void testGetChildren() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         List<SegmentInfo> children = afterSplit.getChildren(0);
         assertEquals(children.size(), 2);
@@ -201,7 +245,7 @@ public class SegmentLayoutTest {
     public void testGetParents() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         List<SegmentInfo> parents = afterSplit.getParents(1);
         assertEquals(parents.size(), 1);
@@ -215,7 +259,7 @@ public class SegmentLayoutTest {
     public void testGetLineage() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(1, Map.of());
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         List<SegmentInfo> lineage = afterSplit.getLineage(0);
         // Lineage of seg-0: itself + its two children
@@ -226,7 +270,7 @@ public class SegmentLayoutTest {
     public void testToMetadata() {
         ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(2, Map.of("key", "value"));
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
-        SegmentLayout afterSplit = layout.splitSegment(0);
+        SegmentLayout afterSplit = layout.splitSegment(0, 0L);
 
         ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key", 
"value"));
         assertEquals(restored.getEpoch(), 1);
@@ -241,10 +285,10 @@ public class SegmentLayoutTest {
         SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
         assertEquals(layout.getNextSegmentId(), 2);
 
-        SegmentLayout split1 = layout.splitSegment(0);
+        SegmentLayout split1 = layout.splitSegment(0, 0L);
         assertEquals(split1.getNextSegmentId(), 4);
 
-        SegmentLayout split2 = split1.splitSegment(1);
+        SegmentLayout split2 = split1.splitSegment(1, 0L);
         assertEquals(split2.getNextSegmentId(), 6);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
index d3f6e1e0bc0..3fa16caec49 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
@@ -138,7 +138,7 @@ public class SubscriptionCoordinatorTest {
     public void testLayoutChangeRebalances() throws Exception {
         coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
 
-        SegmentLayout newLayout = initialLayout.splitSegment(0);
+        SegmentLayout newLayout = initialLayout.splitSegment(0, 0L);
         Map<ConsumerSession, ConsumerAssignment> result =
                 coordinator.onLayoutChange(newLayout).get();
 
@@ -169,7 +169,7 @@ public class SubscriptionCoordinatorTest {
         try {
             orderedCoordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
 
-            SegmentLayout afterSplit = initialLayout.splitSegment(0);
+            SegmentLayout afterSplit = initialLayout.splitSegment(0, 0L);
             Map<ConsumerSession, ConsumerAssignment> result =
                     orderedCoordinator.onLayoutChange(afterSplit).get();
 
@@ -214,7 +214,7 @@ public class SubscriptionCoordinatorTest {
         };
         // Initial 50ms, max 5s — exponential, several polls happen quickly.
         SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
-                topicName, initialLayout.splitSegment(0), resources, scheduler,
+                topicName, initialLayout.splitSegment(0, 0L), resources, 
scheduler,
                 Duration.ofMillis(200), checker, Duration.ofMillis(50), 
Duration.ofSeconds(5));
         try {
             c.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
@@ -252,7 +252,7 @@ public class SubscriptionCoordinatorTest {
             return blocking;
         };
         SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
-                topicName, initialLayout.splitSegment(0), resources, scheduler,
+                topicName, initialLayout.splitSegment(0, 0L), resources, 
scheduler,
                 Duration.ofMillis(200), checker, Duration.ofMillis(20), 
Duration.ofSeconds(1));
         c.registerConsumer("consumer-1", 1L, mock(TransportCnx.class)).get();
         // Wait until at least one poll has started.
@@ -281,7 +281,7 @@ public class SubscriptionCoordinatorTest {
         // no drain checker the active children of the (sealed) split parent 
are eligible
         // right away.
         coordinator.restoreConsumers(java.util.List.of("consumer-1"));
-        SegmentLayout afterSplit = initialLayout.splitSegment(0);
+        SegmentLayout afterSplit = initialLayout.splitSegment(0, 0L);
         Map<ConsumerSession, ConsumerAssignment> result =
                 coordinator.onLayoutChange(afterSplit).get();
         ConsumerAssignment a = findByName(result, "consumer-1");
@@ -305,7 +305,7 @@ public class SubscriptionCoordinatorTest {
     @Test
     public void testInstallDrainCheckerAfterRestoreEnablesOrdering() throws 
Exception {
         coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
-        SegmentLayout afterSplit = initialLayout.splitSegment(0);
+        SegmentLayout afterSplit = initialLayout.splitSegment(0, 0L);
         coordinator.onLayoutChange(afterSplit).get();
         // No checker yet → all 6 segments assigned.
         assertEquals(segmentIds(findByName(coordinator.currentAssignment(), 
"consumer-1")).size(),
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
index de6f6aae18e..848326dd026 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
@@ -27,13 +27,23 @@ import java.util.List;
  * Segments are linked by parent/child edges that form a DAG representing the 
split/merge history.
  * Active segments are the leaves (no children); sealed segments are internal 
nodes.
  *
+ * <p>Two timestamps are recorded:
+ * <ul>
+ *   <li>{@code createdAtEpoch}/{@code sealedAtEpoch} — DAG generation 
numbers, used for
+ *       layout-versioning. They are not wall-clock values.</li>
+ *   <li>{@code createdAtMs}/{@code sealedAtMs} — wall-clock millis since the 
unix epoch.
+ *       Used for retention-based segment GC and for timestamp-based seek.</li>
+ * </ul>
+ *
  * @param segmentId      monotonically increasing, unique within the topic
  * @param hashRange      inclusive hash range [start, end]
  * @param state          ACTIVE or SEALED
  * @param parentIds      parent segment IDs in the DAG (empty for initial/root 
segments)
  * @param childIds       child segment IDs in the DAG (empty for active leaf 
segments)
- * @param createdAtEpoch epoch when this segment was created
- * @param sealedAtEpoch  epoch when sealed (-1 if still active)
+ * @param createdAtEpoch DAG epoch when this segment was created
+ * @param sealedAtEpoch  DAG epoch when sealed (-1 if still active)
+ * @param createdAtMs    wall-clock millis at creation time
+ * @param sealedAtMs     wall-clock millis at seal time (-1 if still active)
  */
 public record SegmentInfo(
         long segmentId,
@@ -42,7 +52,9 @@ public record SegmentInfo(
         List<Long> parentIds,
         List<Long> childIds,
         long createdAtEpoch,
-        long sealedAtEpoch
+        long sealedAtEpoch,
+        long createdAtMs,
+        long sealedAtMs
 ) {
     public SegmentInfo {
         parentIds = parentIds != null ? List.copyOf(parentIds) : List.of();
@@ -50,34 +62,35 @@ public record SegmentInfo(
     }
 
     /** Create a new active segment with no parents. */
-    public static SegmentInfo active(long segmentId, HashRange hashRange, long 
createdAtEpoch) {
+    public static SegmentInfo active(long segmentId, HashRange hashRange,
+                                     long createdAtEpoch, long createdAtMs) {
         return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
-                List.of(), List.of(), createdAtEpoch, -1);
+                List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1);
     }
 
     /** Create a new active segment with the given parent IDs. */
     public static SegmentInfo active(long segmentId, HashRange hashRange,
-                                     List<Long> parentIds, long 
createdAtEpoch) {
+                                     List<Long> parentIds, long 
createdAtEpoch, long createdAtMs) {
         return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
-                parentIds, List.of(), createdAtEpoch, -1);
+                parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1);
     }
 
     /** Return a sealed copy of this segment with the given child IDs. */
-    public SegmentInfo sealed(long sealedAtEpoch, List<Long> childIds) {
+    public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List<Long> 
childIds) {
         return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED,
-                parentIds, childIds, createdAtEpoch, sealedAtEpoch);
+                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs);
     }
 
     /** Return a copy with different parent IDs. */
     public SegmentInfo withParentIds(List<Long> parentIds) {
         return new SegmentInfo(segmentId, hashRange, state,
-                parentIds, childIds, createdAtEpoch, sealedAtEpoch);
+                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs);
     }
 
     /** Return a copy with different child IDs. */
     public SegmentInfo withChildIds(List<Long> childIds) {
         return new SegmentInfo(segmentId, hashRange, state,
-                parentIds, childIds, createdAtEpoch, sealedAtEpoch);
+                parentIds, childIds, createdAtEpoch, sealedAtEpoch, 
createdAtMs, sealedAtMs);
     }
 
     public boolean isActive() {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 6d135297317..b2a698d33da 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -851,6 +851,10 @@ message SegmentInfoProto {
     repeated uint64 child_ids        = 6;
     required uint64 created_at_epoch = 7;
     optional uint64 sealed_at_epoch  = 8;
+    // Wall-clock millis at create / seal time. Used for retention-based 
segment GC
+    // and timestamp-based seek; epoch above is a DAG generation number, not a 
clock.
+    required uint64 created_at_ms    = 9;
+    optional uint64 sealed_at_ms     = 10;
 }
 
 message SegmentBrokerAddress {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
index 9afd76a98e0..c2c92676362 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
@@ -87,7 +87,8 @@ public class CommandsScalableTopicTest {
                 .setHashStart(0x0000)
                 .setHashEnd(0x7FFF)
                 .setState(SegmentState.ACTIVE)
-                .setCreatedAtEpoch(0L);
+                .setCreatedAtEpoch(0L)
+                .setCreatedAtMs(System.currentTimeMillis());
         active.addChildId(2L);
         active.addChildId(3L);
         dag.addSegment()
@@ -96,6 +97,7 @@ public class CommandsScalableTopicTest {
                 .setHashEnd(0x3FFF)
                 .setState(SegmentState.ACTIVE)
                 .setCreatedAtEpoch(7L)
+                .setCreatedAtMs(System.currentTimeMillis())
                 .addParentId(0L);
         
dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650");
 

Reply via email to