This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 77358513290 [feat] PIP-468: record wall-clock created/sealed
timestamps on segments (#25658)
77358513290 is described below
commit 7735851329088d0e9ca1553989c503cde47dfb76
Author: Matteo Merli <[email protected]>
AuthorDate: Mon May 4 02:06:08 2026 -0700
[feat] PIP-468: record wall-clock created/sealed timestamps on segments
(#25658)
---
.../broker/service/scalable/DagWatchSession.java | 4 ++
.../service/scalable/ScalableTopicController.java | 20 +++++--
.../broker/service/scalable/SegmentLayout.java | 23 ++++---
.../service/scalable/DagWatchSessionTest.java | 16 ++++-
.../broker/service/scalable/SegmentLayoutTest.java | 70 ++++++++++++++++++----
.../scalable/SubscriptionCoordinatorTest.java | 12 ++--
.../apache/pulsar/common/scalable/SegmentInfo.java | 35 +++++++----
pulsar-common/src/main/proto/PulsarApi.proto | 4 ++
.../common/protocol/CommandsScalableTopicTest.java | 4 +-
9 files changed, 143 insertions(+), 45 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 1e7e87865a3..2cc50301486 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -166,6 +166,10 @@ public class DagWatchSession implements
ScalableTopicResources.MetadataPathListe
if (seg.sealedAtEpoch() >= 0) {
segProto.setSealedAtEpoch(seg.sealedAtEpoch());
}
+ segProto.setCreatedAtMs(seg.createdAtMs());
+ if (seg.sealedAtMs() >= 0) {
+ segProto.setSealedAtMs(seg.sealedAtMs());
+ }
}
// Add broker addresses for active segments
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 7aef354a0f8..4a7b15a1e15 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -285,8 +285,13 @@ public class ScalableTopicController {
public CompletableFuture<SegmentLayout> splitSegment(long segmentId) {
checkLeader();
+ // Single timestamp shared by the local preview and the CAS-retried
metadata update,
+ // so the children's createdAtMs and the parent's sealedAtMs always
agree even if the
+ // CAS retries due to concurrent writers.
+ final long nowMs = System.currentTimeMillis();
+
// Compute the new layout locally to derive child segment info
- SegmentLayout newLayout = currentLayout.splitSegment(segmentId);
+ SegmentLayout newLayout = currentLayout.splitSegment(segmentId, nowMs);
SegmentInfo child1 =
newLayout.getAllSegments().get(newLayout.getNextSegmentId() - 2);
SegmentInfo child2 =
newLayout.getAllSegments().get(newLayout.getNextSegmentId() - 1);
SegmentInfo parent = currentLayout.getAllSegments().get(segmentId);
@@ -309,7 +314,7 @@ public class ScalableTopicController {
// Step 4: Atomic metadata update (only after topics + cursors are
ready + parent terminated)
.thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md
-> {
SegmentLayout latest = SegmentLayout.fromMetadata(md);
- SegmentLayout updated = latest.splitSegment(segmentId);
+ SegmentLayout updated = latest.splitSegment(segmentId, nowMs);
return updated.toMetadata(md.getProperties());
}))
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
@@ -330,8 +335,12 @@ public class ScalableTopicController {
public CompletableFuture<SegmentLayout> mergeSegments(long segmentId1,
long segmentId2) {
checkLeader();
+ // Single timestamp shared by the local preview and the CAS-retried
metadata
+ // update — see splitSegment for the rationale.
+ final long nowMs = System.currentTimeMillis();
+
// Compute the new layout locally to derive merged segment info
- SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1,
segmentId2);
+ SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1,
segmentId2, nowMs);
SegmentInfo merged =
newLayout.getAllSegments().get(newLayout.getNextSegmentId() - 1);
SegmentInfo parent1 = currentLayout.getAllSegments().get(segmentId1);
SegmentInfo parent2 = currentLayout.getAllSegments().get(segmentId2);
@@ -351,7 +360,7 @@ public class ScalableTopicController {
// Step 3: Atomic metadata update (only after topic + cursors are
ready + parents terminated)
.thenCompose(__ -> resources.updateScalableTopicAsync(topicName, md
-> {
SegmentLayout latest = SegmentLayout.fromMetadata(md);
- SegmentLayout updated = latest.mergeSegments(segmentId1,
segmentId2);
+ SegmentLayout updated = latest.mergeSegments(segmentId1,
segmentId2, nowMs);
return updated.toMetadata(md.getProperties());
}))
.thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
@@ -690,11 +699,12 @@ public class ScalableTopicController {
int rangeSize = (HashRange.MAX_HASH + 1) / numInitialSegments;
Map<Long, SegmentInfo> segments = new LinkedHashMap<>();
+ long nowMs = System.currentTimeMillis();
for (int i = 0; i < numInitialSegments; i++) {
int start = i * rangeSize;
int end = (i == numInitialSegments - 1) ? HashRange.MAX_HASH :
(start + rangeSize - 1);
HashRange range = HashRange.of(start, end);
- SegmentInfo segment = SegmentInfo.active(i, range, 0);
+ SegmentInfo segment = SegmentInfo.active(i, range, 0, nowMs);
segments.put((long) i, segment);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
index 5da17726b88..aed7dd3f8e1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentLayout.java
@@ -116,9 +116,12 @@ public class SegmentLayout {
* Produce a new layout by splitting a segment at its midpoint.
*
* @param segmentId the active segment to split
+ * @param nowMs wall-clock millis used as the parent's seal time and
the
+ * children's create time. Caller passes a single value so
+ * CAS retries and follow-up reads agree.
* @return a new SegmentLayout with the split applied
*/
- public SegmentLayout splitSegment(long segmentId) {
+ public SegmentLayout splitSegment(long segmentId, long nowMs) {
SegmentInfo segment = allSegments.get(segmentId);
if (segment == null) {
throw new IllegalArgumentException("Segment not found: " +
segmentId);
@@ -132,9 +135,11 @@ public class SegmentLayout {
long childId1 = nextSegmentId;
long childId2 = nextSegmentId + 1;
- SegmentInfo sealedParent = segment.sealed(newEpoch, List.of(childId1,
childId2));
- SegmentInfo child1 = SegmentInfo.active(childId1, splitRanges[0],
List.of(segmentId), newEpoch);
- SegmentInfo child2 = SegmentInfo.active(childId2, splitRanges[1],
List.of(segmentId), newEpoch);
+ SegmentInfo sealedParent = segment.sealed(newEpoch, nowMs,
List.of(childId1, childId2));
+ SegmentInfo child1 = SegmentInfo.active(childId1, splitRanges[0],
+ List.of(segmentId), newEpoch, nowMs);
+ SegmentInfo child2 = SegmentInfo.active(childId2, splitRanges[1],
+ List.of(segmentId), newEpoch, nowMs);
Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
newSegments.put(segmentId, sealedParent);
@@ -149,9 +154,11 @@ public class SegmentLayout {
*
* @param segmentId1 the first segment (must be active and adjacent to
segmentId2)
* @param segmentId2 the second segment (must be active and adjacent to
segmentId1)
+ * @param nowMs wall-clock millis used as the parents' seal time and
the
+ * merged child's create time
* @return a new SegmentLayout with the merge applied
*/
- public SegmentLayout mergeSegments(long segmentId1, long segmentId2) {
+ public SegmentLayout mergeSegments(long segmentId1, long segmentId2, long
nowMs) {
SegmentInfo seg1 = allSegments.get(segmentId1);
SegmentInfo seg2 = allSegments.get(segmentId2);
if (seg1 == null || seg2 == null) {
@@ -169,10 +176,10 @@ public class SegmentLayout {
long mergedId = nextSegmentId;
HashRange mergedRange = seg1.hashRange().merge(seg2.hashRange());
- SegmentInfo sealed1 = seg1.sealed(newEpoch, List.of(mergedId));
- SegmentInfo sealed2 = seg2.sealed(newEpoch, List.of(mergedId));
+ SegmentInfo sealed1 = seg1.sealed(newEpoch, nowMs, List.of(mergedId));
+ SegmentInfo sealed2 = seg2.sealed(newEpoch, nowMs, List.of(mergedId));
SegmentInfo merged = SegmentInfo.active(mergedId, mergedRange,
- List.of(segmentId1, segmentId2), newEpoch);
+ List.of(segmentId1, segmentId2), newEpoch, nowMs);
Map<Long, SegmentInfo> newSegments = new LinkedHashMap<>(allSegments);
newSegments.put(segmentId1, sealed1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
index cf14eb6b00e..a14ef6d5780 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -224,6 +224,11 @@ public class DagWatchSessionTest {
assertEquals(parent.getChildIdAt(1), 3L);
assertEquals(parent.getCreatedAtEpoch(), 0L);
assertEquals(parent.getSealedAtEpoch(), 5L);
+ // wall-clock timestamps from seg() helper: createdAtMs = 1000 + id;
+ // sealed segments add 100ms to that.
+ assertEquals(parent.getCreatedAtMs(), 1_000L);
+ assertTrue(parent.hasSealedAtMs(), "sealed segment must carry
sealedAtMs");
+ assertEquals(parent.getSealedAtMs(), 1_100L);
// active children should reference parent 0
var childA = findSegment(dag, 2L);
@@ -234,6 +239,9 @@ public class DagWatchSessionTest {
// sealedAtEpoch is only written when non-negative
assertTrue(!childA.hasSealedAtEpoch() || childA.getSealedAtEpoch() ==
0,
"active segment should not have sealedAtEpoch set");
+ assertEquals(childA.getCreatedAtMs(), 1_002L);
+ assertTrue(!childA.hasSealedAtMs() || childA.getSealedAtMs() == 0,
+ "active segment should not have sealedAtMs set");
// broker addresses only for the 2 active segments
assertEquals(dag.getSegmentBrokersCount(), 2);
@@ -293,6 +301,10 @@ public class DagWatchSessionTest {
private static SegmentInfo seg(long id, int start, int end, SegmentState
state,
long[] parents, long[] children,
long createdAt, long sealedAt) {
+ // wall-clock timestamps don't matter for these wire-format tests; use
a
+ // deterministic non-zero value so the proto round-trip is observable.
+ long createdAtMs = 1_000L + id;
+ long sealedAtMs = state == SegmentState.SEALED ? createdAtMs + 100L :
-1;
return new SegmentInfo(
id,
HashRange.of(start, end),
@@ -300,7 +312,9 @@ public class DagWatchSessionTest {
toList(parents),
toList(children),
createdAt,
- sealedAt);
+ sealedAt,
+ createdAtMs,
+ sealedAtMs);
}
private static java.util.List<Long> toList(long[] arr) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
index e25066e4f1d..b936820072f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SegmentLayoutTest.java
@@ -96,7 +96,7 @@ public class SegmentLayoutTest {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
assertEquals(afterSplit.getEpoch(), 1);
assertEquals(afterSplit.getActiveSegments().size(), 3); // 1 original
+ 2 new
@@ -124,10 +124,10 @@ public class SegmentLayoutTest {
public void testSplitNonActiveSegment() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
// Segment 0 is now sealed, cannot split again
- assertThrows(IllegalArgumentException.class, () ->
afterSplit.splitSegment(0));
+ assertThrows(IllegalArgumentException.class, () ->
afterSplit.splitSegment(0, 0L));
}
@Test
@@ -135,9 +135,9 @@ public class SegmentLayoutTest {
// Start with 2 segments, split seg-0, then merge the children back
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0); // seg-2
[0000-3fff], seg-3 [4000-7fff]
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L); // seg-2
[0000-3fff], seg-3 [4000-7fff]
- SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3);
+ SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3, 0L);
assertEquals(afterMerge.getEpoch(), 2);
assertEquals(afterMerge.getActiveSegments().size(), 2); // merged +
seg-1
@@ -153,20 +153,64 @@ public class SegmentLayoutTest {
assertEquals(merged.parentIds(), List.of(2L, 3L));
}
+ @Test
+ public void testSplitRecordsWallClockTimestamps() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ long splitAt = 1_700_000_000_000L;
+ SegmentLayout after = layout.splitSegment(0, splitAt);
+
+ // Parent gets sealedAtMs = splitAt; createdAtMs is preserved from
layout creation.
+ SegmentInfo parent = after.getAllSegments().get(0L);
+ assertTrue(parent.isSealed());
+ assertEquals(parent.sealedAtMs(), splitAt);
+ assertTrue(parent.createdAtMs() > 0, "createdAtMs must be set at
create time");
+
+ // Both children get createdAtMs = splitAt and sealedAtMs = -1
(active).
+ // initialMetadata(1) has nextSegmentId=1, so split-children are 1 and
2.
+ SegmentInfo child1 = after.getAllSegments().get(1L);
+ SegmentInfo child2 = after.getAllSegments().get(2L);
+ assertEquals(child1.createdAtMs(), splitAt);
+ assertEquals(child2.createdAtMs(), splitAt);
+ assertEquals(child1.sealedAtMs(), -1L);
+ assertEquals(child2.sealedAtMs(), -1L);
+ }
+
+ @Test
+ public void testMergeRecordsWallClockTimestamps() {
+ ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
+ SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
+
+ long splitAt = 1_700_000_000_000L;
+ SegmentLayout afterSplit = layout.splitSegment(0, splitAt);
+ long mergeAt = splitAt + 60_000L;
+ SegmentLayout afterMerge = afterSplit.mergeSegments(2, 3, mergeAt);
+
+ // Both sealed split-children carry sealedAtMs = mergeAt.
+ assertEquals(afterMerge.getAllSegments().get(2L).sealedAtMs(),
mergeAt);
+ assertEquals(afterMerge.getAllSegments().get(3L).sealedAtMs(),
mergeAt);
+
+ // Merged child has createdAtMs = mergeAt.
+ SegmentInfo merged = afterMerge.getAllSegments().get(4L);
+ assertEquals(merged.createdAtMs(), mergeAt);
+ assertEquals(merged.sealedAtMs(), -1L);
+ }
+
@Test
public void testMergeNonAdjacentSegments() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(4, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
// Segments 0 and 2 are not adjacent
- assertThrows(IllegalArgumentException.class, () ->
layout.mergeSegments(0, 2));
+ assertThrows(IllegalArgumentException.class, () ->
layout.mergeSegments(0, 2, 0L));
}
@Test
public void testPruneSegment() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
// Prune sealed segment 0
SegmentLayout afterPrune = afterSplit.pruneSegment(0);
@@ -189,7 +233,7 @@ public class SegmentLayoutTest {
public void testGetChildren() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
List<SegmentInfo> children = afterSplit.getChildren(0);
assertEquals(children.size(), 2);
@@ -201,7 +245,7 @@ public class SegmentLayoutTest {
public void testGetParents() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
List<SegmentInfo> parents = afterSplit.getParents(1);
assertEquals(parents.size(), 1);
@@ -215,7 +259,7 @@ public class SegmentLayoutTest {
public void testGetLineage() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(1, Map.of());
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
List<SegmentInfo> lineage = afterSplit.getLineage(0);
// Lineage of seg-0: itself + its two children
@@ -226,7 +270,7 @@ public class SegmentLayoutTest {
public void testToMetadata() {
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(2, Map.of("key", "value"));
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
- SegmentLayout afterSplit = layout.splitSegment(0);
+ SegmentLayout afterSplit = layout.splitSegment(0, 0L);
ScalableTopicMetadata restored = afterSplit.toMetadata(Map.of("key",
"value"));
assertEquals(restored.getEpoch(), 1);
@@ -241,10 +285,10 @@ public class SegmentLayoutTest {
SegmentLayout layout = SegmentLayout.fromMetadata(metadata);
assertEquals(layout.getNextSegmentId(), 2);
- SegmentLayout split1 = layout.splitSegment(0);
+ SegmentLayout split1 = layout.splitSegment(0, 0L);
assertEquals(split1.getNextSegmentId(), 4);
- SegmentLayout split2 = split1.splitSegment(1);
+ SegmentLayout split2 = split1.splitSegment(1, 0L);
assertEquals(split2.getNextSegmentId(), 6);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
index d3f6e1e0bc0..3fa16caec49 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
@@ -138,7 +138,7 @@ public class SubscriptionCoordinatorTest {
public void testLayoutChangeRebalances() throws Exception {
coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
- SegmentLayout newLayout = initialLayout.splitSegment(0);
+ SegmentLayout newLayout = initialLayout.splitSegment(0, 0L);
Map<ConsumerSession, ConsumerAssignment> result =
coordinator.onLayoutChange(newLayout).get();
@@ -169,7 +169,7 @@ public class SubscriptionCoordinatorTest {
try {
orderedCoordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
- SegmentLayout afterSplit = initialLayout.splitSegment(0);
+ SegmentLayout afterSplit = initialLayout.splitSegment(0, 0L);
Map<ConsumerSession, ConsumerAssignment> result =
orderedCoordinator.onLayoutChange(afterSplit).get();
@@ -214,7 +214,7 @@ public class SubscriptionCoordinatorTest {
};
// Initial 50ms, max 5s — exponential, several polls happen quickly.
SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
- topicName, initialLayout.splitSegment(0), resources, scheduler,
+ topicName, initialLayout.splitSegment(0, 0L), resources,
scheduler,
Duration.ofMillis(200), checker, Duration.ofMillis(50),
Duration.ofSeconds(5));
try {
c.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
@@ -252,7 +252,7 @@ public class SubscriptionCoordinatorTest {
return blocking;
};
SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
- topicName, initialLayout.splitSegment(0), resources, scheduler,
+ topicName, initialLayout.splitSegment(0, 0L), resources,
scheduler,
Duration.ofMillis(200), checker, Duration.ofMillis(20),
Duration.ofSeconds(1));
c.registerConsumer("consumer-1", 1L, mock(TransportCnx.class)).get();
// Wait until at least one poll has started.
@@ -281,7 +281,7 @@ public class SubscriptionCoordinatorTest {
// no drain checker the active children of the (sealed) split parent
are eligible
// right away.
coordinator.restoreConsumers(java.util.List.of("consumer-1"));
- SegmentLayout afterSplit = initialLayout.splitSegment(0);
+ SegmentLayout afterSplit = initialLayout.splitSegment(0, 0L);
Map<ConsumerSession, ConsumerAssignment> result =
coordinator.onLayoutChange(afterSplit).get();
ConsumerAssignment a = findByName(result, "consumer-1");
@@ -305,7 +305,7 @@ public class SubscriptionCoordinatorTest {
@Test
public void testInstallDrainCheckerAfterRestoreEnablesOrdering() throws
Exception {
coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
- SegmentLayout afterSplit = initialLayout.splitSegment(0);
+ SegmentLayout afterSplit = initialLayout.splitSegment(0, 0L);
coordinator.onLayoutChange(afterSplit).get();
// No checker yet → all 6 segments assigned.
assertEquals(segmentIds(findByName(coordinator.currentAssignment(),
"consumer-1")).size(),
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
index de6f6aae18e..848326dd026 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/scalable/SegmentInfo.java
@@ -27,13 +27,23 @@ import java.util.List;
* Segments are linked by parent/child edges that form a DAG representing the
split/merge history.
* Active segments are the leaves (no children); sealed segments are internal
nodes.
*
+ * <p>Two timestamps are recorded:
+ * <ul>
+ * <li>{@code createdAtEpoch}/{@code sealedAtEpoch} — DAG generation
numbers, used for
+ * layout-versioning. They are not wall-clock values.</li>
+ * <li>{@code createdAtMs}/{@code sealedAtMs} — wall-clock millis since the
unix epoch.
+ * Used for retention-based segment GC and for timestamp-based seek.</li>
+ * </ul>
+ *
* @param segmentId monotonically increasing, unique within the topic
* @param hashRange inclusive hash range [start, end]
* @param state ACTIVE or SEALED
* @param parentIds parent segment IDs in the DAG (empty for initial/root
segments)
* @param childIds child segment IDs in the DAG (empty for active leaf
segments)
- * @param createdAtEpoch epoch when this segment was created
- * @param sealedAtEpoch epoch when sealed (-1 if still active)
+ * @param createdAtEpoch DAG epoch when this segment was created
+ * @param sealedAtEpoch DAG epoch when sealed (-1 if still active)
+ * @param createdAtMs wall-clock millis at creation time
+ * @param sealedAtMs wall-clock millis at seal time (-1 if still active)
*/
public record SegmentInfo(
long segmentId,
@@ -42,7 +52,9 @@ public record SegmentInfo(
List<Long> parentIds,
List<Long> childIds,
long createdAtEpoch,
- long sealedAtEpoch
+ long sealedAtEpoch,
+ long createdAtMs,
+ long sealedAtMs
) {
public SegmentInfo {
parentIds = parentIds != null ? List.copyOf(parentIds) : List.of();
@@ -50,34 +62,35 @@ public record SegmentInfo(
}
/** Create a new active segment with no parents. */
- public static SegmentInfo active(long segmentId, HashRange hashRange, long
createdAtEpoch) {
+ public static SegmentInfo active(long segmentId, HashRange hashRange,
+ long createdAtEpoch, long createdAtMs) {
return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
- List.of(), List.of(), createdAtEpoch, -1);
+ List.of(), List.of(), createdAtEpoch, -1, createdAtMs, -1);
}
/** Create a new active segment with the given parent IDs. */
public static SegmentInfo active(long segmentId, HashRange hashRange,
- List<Long> parentIds, long
createdAtEpoch) {
+ List<Long> parentIds, long
createdAtEpoch, long createdAtMs) {
return new SegmentInfo(segmentId, hashRange, SegmentState.ACTIVE,
- parentIds, List.of(), createdAtEpoch, -1);
+ parentIds, List.of(), createdAtEpoch, -1, createdAtMs, -1);
}
/** Return a sealed copy of this segment with the given child IDs. */
- public SegmentInfo sealed(long sealedAtEpoch, List<Long> childIds) {
+ public SegmentInfo sealed(long sealedAtEpoch, long sealedAtMs, List<Long>
childIds) {
return new SegmentInfo(segmentId, hashRange, SegmentState.SEALED,
- parentIds, childIds, createdAtEpoch, sealedAtEpoch);
+ parentIds, childIds, createdAtEpoch, sealedAtEpoch,
createdAtMs, sealedAtMs);
}
/** Return a copy with different parent IDs. */
public SegmentInfo withParentIds(List<Long> parentIds) {
return new SegmentInfo(segmentId, hashRange, state,
- parentIds, childIds, createdAtEpoch, sealedAtEpoch);
+ parentIds, childIds, createdAtEpoch, sealedAtEpoch,
createdAtMs, sealedAtMs);
}
/** Return a copy with different child IDs. */
public SegmentInfo withChildIds(List<Long> childIds) {
return new SegmentInfo(segmentId, hashRange, state,
- parentIds, childIds, createdAtEpoch, sealedAtEpoch);
+ parentIds, childIds, createdAtEpoch, sealedAtEpoch,
createdAtMs, sealedAtMs);
}
public boolean isActive() {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 6d135297317..b2a698d33da 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -851,6 +851,10 @@ message SegmentInfoProto {
repeated uint64 child_ids = 6;
required uint64 created_at_epoch = 7;
optional uint64 sealed_at_epoch = 8;
+ // Wall-clock millis at create / seal time. Used for retention-based
segment GC
+ // and timestamp-based seek; epoch above is a DAG generation number, not a
clock.
+ required uint64 created_at_ms = 9;
+ optional uint64 sealed_at_ms = 10;
}
message SegmentBrokerAddress {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
index 9afd76a98e0..c2c92676362 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTopicTest.java
@@ -87,7 +87,8 @@ public class CommandsScalableTopicTest {
.setHashStart(0x0000)
.setHashEnd(0x7FFF)
.setState(SegmentState.ACTIVE)
- .setCreatedAtEpoch(0L);
+ .setCreatedAtEpoch(0L)
+ .setCreatedAtMs(System.currentTimeMillis());
active.addChildId(2L);
active.addChildId(3L);
dag.addSegment()
@@ -96,6 +97,7 @@ public class CommandsScalableTopicTest {
.setHashEnd(0x3FFF)
.setState(SegmentState.ACTIVE)
.setCreatedAtEpoch(7L)
+ .setCreatedAtMs(System.currentTimeMillis())
.addParentId(0L);
dag.addSegmentBroker().setSegmentId(2L).setBrokerUrl("pulsar://broker-a:6650");