This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e3e1c08e513 Revert "[improve][broker] Optimization protobuf code in
the bucket delayed tracker (#20158)"
e3e1c08e513 is described below
commit e3e1c08e513823bba2758da70f32079a3aa0d318
Author: coderzc <[email protected]>
AuthorDate: Sun Apr 23 23:53:43 2023 +0800
Revert "[improve][broker] Optimization protobuf code in the bucket delayed
tracker (#20158)"
This reverts commit 3da39b2e1553cecbc6d6b85e8bc7844f611d5637.
---
pulsar-broker/pom.xml | 2 -
.../bucket/BookkeeperBucketSnapshotStorage.java | 24 +++---
.../pulsar/broker/delayed/bucket/Bucket.java | 7 +-
.../bucket/BucketDelayedDeliveryTracker.java | 13 +--
.../delayed/bucket/BucketSnapshotStorage.java | 4 +-
.../bucket/CombinedSegmentDelayedIndexQueue.java | 22 ++---
.../broker/delayed/bucket/DelayedIndexQueue.java | 12 +--
.../broker/delayed/bucket/ImmutableBucket.java | 19 +++--
.../broker/delayed/bucket/MutableBucket.java | 29 +++----
.../TripleLongPriorityDelayedIndexQueue.java | 25 ++----
.../proto/DelayedMessageIndexBucketSegment.proto | 35 --------
... DelayedMessageIndexBucketSnapshotFormat.proto} | 11 ++-
.../BookkeeperBucketSnapshotStorageTest.java | 95 +++++++++++-----------
.../broker/delayed/MockBucketSnapshotStorage.java | 14 ++--
.../delayed/bucket/DelayedIndexQueueTest.java | 70 +++++++++-------
15 files changed, 172 insertions(+), 210 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 25b31b4f2b7..31b335e1aea 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -566,7 +566,6 @@
<excludes>
<exclude>**/ResourceUsage.proto</exclude>
<exclude>**/TransactionPendingAck.proto</exclude>
- <exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
</excludes>
</configuration>
<executions>
@@ -611,7 +610,6 @@
<sources>
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
-
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
</sources>
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
<targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 040bbbc586f..18a4c322f7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
@@ -126,8 +126,7 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry
ledgerEntry) {
try {
- ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
- return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
+ return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
}
@@ -135,14 +134,15 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
private List<SnapshotSegment>
parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
- while (entryEnumeration.hasMoreElements()) {
- LedgerEntry ledgerEntry = entryEnumeration.nextElement();
- SnapshotSegment snapshotSegment = new SnapshotSegment();
- ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
- snapshotSegment.parseFrom(entryBuffer,
entryBuffer.readableBytes());
- snapshotMetadataList.add(snapshotSegment);
+ try {
+ while (entryEnumeration.hasMoreElements()) {
+ LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+ }
+ return snapshotMetadataList;
+ } catch (IOException e) {
+ throw new BucketSnapshotSerializationException(e);
}
- return snapshotMetadataList;
}
@NotNull
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index a1693b1553d..4d7d3aa512b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -31,8 +31,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
@@ -133,8 +132,8 @@ abstract class Bucket {
}
CompletableFuture<Long> asyncSaveBucketSnapshot(
- ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
- List<SnapshotSegment> bucketSnapshotSegments) {
+ ImmutableBucket bucket,
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0,
dispatcherName.lastIndexOf(" / " + cursorName));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index c90064c9137..b17387e276e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -53,8 +53,8 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
@@ -286,7 +286,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
// Put indexes back into the shared queue and downgrade to
memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
- for (SnapshotSegment snapshotSegment :
snapshotSegments) {
+ for
(DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
+ snapshotSegments) {
for (DelayedIndex delayedIndex :
snapshotSegment.getIndexesList()) {
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(),
delayedIndex.getEntryId());
@@ -449,7 +450,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
return FutureUtil.failedFuture(new RuntimeException("Can't
merge buckets due to bucket create failed"));
}
- List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
+
List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>>
getRemainFutures =
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
return FutureUtil.waitForAll(getRemainFutures)
@@ -600,11 +601,11 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
- DelayedIndex
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
lastDelayedIndex =
indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
- for (DelayedIndex index : indexList) {
+ for
(DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index 7464ef9cd3f..51c89bed47a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.broker.delayed.bucket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
public interface BucketSnapshotStorage {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
index 006938e9ed2..5655a268782 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -24,8 +24,8 @@ import java.util.Objects;
import java.util.PriorityQueue;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.AllArgsConstructor;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
@NotThreadSafe
class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
@@ -40,8 +40,8 @@ class CombinedSegmentDelayedIndexQueue implements
DelayedIndexQueue {
}
private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) ->
DelayedIndexQueue.COMPARATOR.compare(
-
node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
-
node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));
+
node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
+
node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
private final PriorityQueue<Node> kpq;
@@ -77,7 +77,7 @@ class CombinedSegmentDelayedIndexQueue implements
DelayedIndexQueue {
Objects.requireNonNull(node);
SnapshotSegment snapshotSegment =
node.segmentList.get(node.segmentListCursor);
- DelayedIndex delayedIndex =
snapshotSegment.getIndexeAt(node.segmentCursor);
+ DelayedIndex delayedIndex =
snapshotSegment.getIndexes(node.segmentCursor);
if (!needAdvanceCursor) {
return delayedIndex;
}
@@ -104,16 +104,4 @@ class CombinedSegmentDelayedIndexQueue implements
DelayedIndexQueue {
return delayedIndex;
}
-
- @Override
- public void popToObject(DelayedIndex delayedIndex) {
- DelayedIndex value = getValue(true);
- delayedIndex.copyFrom(value);
- }
-
- @Override
- public long peekTimestamp() {
- DelayedIndex value = getValue(false);
- return value.getTimestamp();
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
index f1209a3137a..dee476c376e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
@@ -20,10 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket;
import java.util.Comparator;
import java.util.Objects;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
interface DelayedIndexQueue {
- Comparator<DelayedIndex> COMPARATOR = (o1, o2) -> {
+ Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex>
COMPARATOR = (o1, o2) -> {
if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
} else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
@@ -35,11 +35,7 @@ interface DelayedIndexQueue {
boolean isEmpty();
- DelayedIndex peek();
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
- DelayedIndex pop();
-
- void popToObject(DelayedIndex delayedIndex);
-
- long peekTimestamp();
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 0932f51f350..57de5c84fcd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -32,9 +32,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.InvalidRoaringFormat;
import org.roaringbitmap.RoaringBitmap;
@@ -43,7 +43,7 @@ import org.roaringbitmap.RoaringBitmap;
class ImmutableBucket extends Bucket {
@Setter
- private List<SnapshotSegment> snapshotSegments;
+ private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
snapshotSegments;
boolean merging = false;
@@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
super(dispatcherName, cursor, sequencer, storage, startLedgerId,
endLedgerId);
}
- public Optional<List<SnapshotSegment>> getSnapshotSegments() {
+ public
Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
getSnapshotSegments() {
return Optional.ofNullable(snapshotSegments);
}
@@ -84,7 +84,7 @@ class ImmutableBucket extends Bucket {
}
}), BucketSnapshotPersistenceException.class,
MaxRetryTimes)
.thenApply(snapshotMetadata -> {
- List<SnapshotSegmentMetadata> metadataList =
+
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata>
metadataList =
snapshotMetadata.getMetadataListList();
// Skip all already reach schedule time snapshot
segments
@@ -125,9 +125,10 @@ class ImmutableBucket extends Bucket {
return Collections.emptyList();
}
- SnapshotSegment snapshotSegment =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
bucketSnapshotSegments.get(0);
- List<DelayedIndex> indexList =
snapshotSegment.getIndexesList();
+
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
+ snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
if (isRecover) {
this.asyncUpdateSnapshotLength();
@@ -170,7 +171,7 @@ class ImmutableBucket extends Bucket {
setNumberBucketDelayedMessages(numberMessages.getValue());
}
- CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
getRemainSnapshotSegment() {
int nextSegmentEntryId = currentSegmentEntryId + 1;
if (nextSegmentEntryId > lastSegmentEntryId) {
return CompletableFuture.completedFuture(Collections.emptyList());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index b7e9e68f1bd..f404d5d02c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -30,10 +30,10 @@ import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
@@ -75,16 +75,14 @@ class MutableBucket extends Bucket implements AutoCloseable
{
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
- SnapshotSegment snapshotSegment = new SnapshotSegment();
+ SnapshotSegment.Builder snapshotSegmentBuilder =
SnapshotSegment.newBuilder();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder =
SnapshotSegmentMetadata.newBuilder();
List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex delayedIndex = snapshotSegment.addIndexe();
- delayedIndexQueue.popToObject(delayedIndex);
-
+ DelayedIndex delayedIndex = delayedIndexQueue.peek();
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
@@ -102,13 +100,16 @@ class MutableBucket extends Bucket implements
AutoCloseable {
sharedQueue.add(timestamp, ledgerId, entryId);
}
+ delayedIndexQueue.pop();
numMessages++;
bitMap.computeIfAbsent(ledgerId, k -> new
RoaringBitmap()).add(entryId, entryId + 1);
- if (delayedIndexQueue.isEmpty() ||
delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
+ snapshotSegmentBuilder.addIndexes(delayedIndex);
+
+ if (delayedIndexQueue.isEmpty() ||
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
- && snapshotSegment.getIndexesCount() >=
maxIndexesPerBucketSnapshotSegment)) {
+ && snapshotSegmentBuilder.getIndexesCount() >=
maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
@@ -128,8 +129,8 @@ class MutableBucket extends Bucket implements AutoCloseable
{
segmentMetadataList.add(segmentMetadataBuilder.build());
segmentMetadataBuilder.clear();
- bucketSnapshotSegments.add(snapshotSegment);
- snapshotSegment = new SnapshotSegment();
+ bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
+ snapshotSegmentBuilder.clear();
}
}
@@ -152,8 +153,8 @@ class MutableBucket extends Bucket implements AutoCloseable
{
// Add the first snapshot segment last message to
snapshotSegmentLastMessageTable
checkArgument(!bucketSnapshotSegments.isEmpty());
- SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
- DelayedIndex lastDelayedIndex =
firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1);
+ SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
+ DelayedIndex lastDelayedIndex =
snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket,
lastDelayedIndex);
CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
index 4faee3b17f1..b8d54bd78b4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;
import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@NotThreadSafe
@@ -41,28 +41,17 @@ class TripleLongPriorityDelayedIndexQueue implements
DelayedIndexQueue {
}
@Override
- public DelayedIndex peek() {
- DelayedIndex delayedIndex = new
DelayedIndex().setTimestamp(queue.peekN1())
- .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
+ public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
+
.setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
return delayedIndex;
}
@Override
- public DelayedIndex pop() {
- DelayedIndex peek = peek();
+ public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
queue.pop();
return peek;
}
-
- @Override
- public void popToObject(DelayedIndex delayedIndex) {
- delayedIndex.setTimestamp(queue.peekN1())
- .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
- queue.pop();
- }
-
- @Override
- public long peekTimestamp() {
- return queue.peekN1();
- }
}
diff --git
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
deleted file mode 100644
index 633d6a8f161..00000000000
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-syntax = "proto2";
-
-package pulsar.delay;
-option java_package = "org.apache.pulsar.broker.delayed.proto";
-option optimize_for = SPEED;
-option java_multiple_files = true;
-
-message DelayedIndex {
- required uint64 timestamp = 1;
- required uint64 ledger_id = 2;
- required uint64 entry_id = 3;
-}
-
-message SnapshotSegment {
- repeated DelayedIndex indexes = 1;
- map<uint64, bytes> delayed_index_bit_map = 2;
-}
diff --git
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
similarity index 85%
rename from pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
rename to
pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
index 01b770c567d..6996b860c52 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
@@ -21,7 +21,12 @@ syntax = "proto2";
package pulsar.delay;
option java_package = "org.apache.pulsar.broker.delayed.proto";
option optimize_for = SPEED;
-option java_multiple_files = true;
+
+message DelayedIndex {
+ required uint64 timestamp = 1;
+ required uint64 ledger_id = 2;
+ required uint64 entry_id = 3;
+}
message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
@@ -29,6 +34,10 @@ message SnapshotSegmentMetadata {
required uint64 min_schedule_timestamp = 3;
}
+message SnapshotSegment {
+ repeated DelayedIndex indexes = 1;
+}
+
message SnapshotMetadata {
repeated SnapshotSegmentMetadata metadata_list = 1;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index d26f38fa2bc..7cb6b8d5865 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -29,10 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -63,8 +60,9 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testCreateSnapshot() throws ExecutionException,
InterruptedException {
- SnapshotMetadata snapshotMetadata =
SnapshotMetadata.newBuilder().build();
- List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString(),
TOPIC_NAME, CURSOR_NAME);
@@ -74,23 +72,24 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testGetSnapshot() throws ExecutionException,
InterruptedException {
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
.setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
- DelayedIndex delayedIndex = new
DelayedIndex().setLedgerId(100L).setEntryId(10L)
- .setTimestamp(timeMillis);
- SnapshotSegment snapshotSegment = new SnapshotSegment();
- snapshotSegment.addIndexe().copyFrom(delayedIndex);
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+ .setTimestamp(timeMillis).build();
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment
snapshotSegment =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
bucketSnapshotSegments.add(snapshotSegment);
bucketSnapshotSegments.add(snapshotSegment);
@@ -100,13 +99,13 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
Long bucketId = future.get();
Assert.assertNotNull(bucketId);
- CompletableFuture<List<SnapshotSegment>> bucketSnapshotSegment =
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
bucketSnapshotSegment =
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3);
- List<SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
snapshotSegments = bucketSnapshotSegment.get();
Assert.assertEquals(2, snapshotSegments.size());
- for (SnapshotSegment segment : snapshotSegments) {
- for (DelayedIndex index : segment.getIndexesList()) {
+ for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment segment :
snapshotSegments) {
+ for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index :
segment.getIndexesList()) {
Assert.assertEquals(100L, index.getLedgerId());
Assert.assertEquals(10L, index.getEntryId());
Assert.assertEquals(timeMillis, index.getTimestamp());
@@ -122,17 +121,17 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8));
map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8));
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
.setMaxScheduleTimestamp(timeMillis)
.setMinScheduleTimestamp(timeMillis)
.putAllDelayedIndexBitMap(map).build();
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -140,10 +139,10 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
Long bucketId = future.get();
Assert.assertNotNull(bucketId);
- SnapshotMetadata bucketSnapshotMetadata =
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
bucketSnapshotMetadata =
bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get();
- SnapshotSegmentMetadata metadata =
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
metadata =
bucketSnapshotMetadata.getMetadataList(0);
Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp());
@@ -153,9 +152,9 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testDeleteSnapshot() throws ExecutionException,
InterruptedException {
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder().build();
- List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString(),
TOPIC_NAME, CURSOR_NAME);
@@ -174,22 +173,24 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testGetBucketSnapshotLength() throws ExecutionException,
InterruptedException {
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
.setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
- DelayedIndex delayedIndex = new
DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
- SnapshotSegment snapshotSegment = new SnapshotSegment();
- snapshotSegment.addIndexe().copyFrom(delayedIndex);
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+ .setTimestamp(timeMillis).build();
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment
snapshotSegment =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
bucketSnapshotSegments.add(snapshotSegment);
bucketSnapshotSegments.add(snapshotSegment);
@@ -206,22 +207,24 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testConcurrencyGet() throws ExecutionException,
InterruptedException {
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
.setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
- DelayedIndex delayedIndex = new
DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
- SnapshotSegment snapshotSegment = new SnapshotSegment();
- snapshotSegment.addIndexe().copyFrom(delayedIndex);
+ DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+ .setTimestamp(timeMillis).build();
+ DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment
snapshotSegment =
+
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
bucketSnapshotSegments.add(snapshotSegment);
bucketSnapshotSegments.add(snapshotSegment);
@@ -234,7 +237,7 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
CompletableFuture<Void> future0 = CompletableFuture.runAsync(() ->
{
- List<SnapshotSegment> list =
+ List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
list =
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
Assert.assertTrue(list.size() > 0);
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index dc1c0e09ca2..9e924bdeda3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
@@ -139,9 +139,13 @@ public class MockBucketSnapshotStorage implements
BucketSnapshotStorage {
long lastEntryId = Math.min(lastSegmentEntryId,
this.bucketSnapshots.get(bucketId).size());
for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) {
ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i);
- SnapshotSegment snapshotSegment = new SnapshotSegment();
- snapshotSegment.parseFrom(byteBuf, byteBuf.readableBytes());
- snapshotSegments.add(snapshotSegment);
+ SnapshotSegment snapshotSegment;
+ try {
+ snapshotSegment =
SnapshotSegment.parseFrom(byteBuf.nioBuffer());
+ snapshotSegments.add(snapshotSegment);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
}
return snapshotSegments;
}, executorService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
index 5dc3dcc7cb9..8f87f0d49a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
import java.util.List;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -35,19 +35,27 @@ public class DelayedIndexQueueTest {
@Test
public void testCompare() {
DelayedIndex delayedIndex =
- new
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+ .build();
DelayedIndex delayedIndex2 =
- new
DelayedIndex().setTimestamp(2).setLedgerId(2L).setEntryId(2L);
+
DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L)
+ .build();
Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
delayedIndex =
- new
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+ .build();
delayedIndex2 =
- new
DelayedIndex().setTimestamp(1).setLedgerId(2L).setEntryId(2L);
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L)
+ .build();
Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
- delayedIndex = new
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
- delayedIndex2 = new
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(2L);
+ delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+ .build();
+ delayedIndex2 =
+
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L)
+ .build();
Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
}
@@ -55,19 +63,21 @@ public class DelayedIndexQueueTest {
public void testCombinedSegmentDelayedIndexQueue() {
List<DelayedIndex> listA = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- DelayedIndex delayedIndex = new
DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+ .build();
listA.add(delayedIndex);
}
- SnapshotSegment snapshotSegmentA1 = new SnapshotSegment();
- snapshotSegmentA1.addAllIndexes(listA);
+ SnapshotSegment snapshotSegmentA1 =
SnapshotSegment.newBuilder().addAllIndexes(listA).build();
List<DelayedIndex> listA2 = new ArrayList<>();
for (int i = 10; i < 20; i++) {
- DelayedIndex delayedIndex = new
DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+ .build();
listA2.add(delayedIndex);
}
- SnapshotSegment snapshotSegmentA2 = new SnapshotSegment();
- snapshotSegmentA2.addAllIndexes(listA2);
+ SnapshotSegment snapshotSegmentA2 =
SnapshotSegment.newBuilder().addAllIndexes(listA2).build();
List<SnapshotSegment> segmentListA = new ArrayList<>();
segmentListA.add(snapshotSegmentA1);
@@ -75,32 +85,36 @@ public class DelayedIndexQueueTest {
List<DelayedIndex> listB = new ArrayList<>();
for (int i = 0; i < 9; i++) {
- DelayedIndex delayedIndex = new
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
+ DelayedIndex delayedIndex =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
+ .build();
- DelayedIndex delayedIndex2 = new
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
+ DelayedIndex delayedIndex2 =
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
+ .build();
listB.add(delayedIndex);
listB.add(delayedIndex2);
}
- SnapshotSegment snapshotSegmentB = new SnapshotSegment();
- snapshotSegmentB.addAllIndexes(listB);
+ SnapshotSegment snapshotSegmentB =
SnapshotSegment.newBuilder().addAllIndexes(listB).build();
List<SnapshotSegment> segmentListB = new ArrayList<>();
segmentListB.add(snapshotSegmentB);
- segmentListB.add(new SnapshotSegment());
+ segmentListB.add(SnapshotSegment.newBuilder().build());
List<DelayedIndex> listC = new ArrayList<>();
for (int i = 10; i < 30; i+=2) {
DelayedIndex delayedIndex =
- new
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
+ .build();
DelayedIndex delayedIndex2 =
- new
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
+
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
+ .build();
listC.add(delayedIndex);
listC.add(delayedIndex2);
}
- SnapshotSegment snapshotSegmentC = new SnapshotSegment();
- snapshotSegmentC.addAllIndexes(listC);
+ SnapshotSegment snapshotSegmentC =
SnapshotSegment.newBuilder().addAllIndexes(listC).build();
List<SnapshotSegment> segmentListC = new ArrayList<>();
segmentListC.add(snapshotSegmentC);
@@ -109,14 +123,11 @@ public class DelayedIndexQueueTest {
int count = 0;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex pop = new DelayedIndex();
- delayedIndexQueue.popToObject(pop);
+ DelayedIndex pop = delayedIndexQueue.pop();
log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(),
pop.getEntryId());
count++;
if (!delayedIndexQueue.isEmpty()) {
DelayedIndex peek = delayedIndexQueue.peek();
- long timestamp = delayedIndexQueue.peekTimestamp();
- Assert.assertEquals(timestamp, peek.getTimestamp());
Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
}
}
@@ -136,13 +147,10 @@ public class DelayedIndexQueueTest {
int count = 0;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex pop = new DelayedIndex();
- delayedIndexQueue.popToObject(pop);
+ DelayedIndex pop = delayedIndexQueue.pop();
count++;
if (!delayedIndexQueue.isEmpty()) {
DelayedIndex peek = delayedIndexQueue.peek();
- long timestamp = delayedIndexQueue.peekTimestamp();
- Assert.assertEquals(timestamp, peek.getTimestamp());
Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
}
}