This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e3e1c08e513 Revert "[improve][broker] Optimization protobuf code in 
the bucket delayed tracker (#20158)"
e3e1c08e513 is described below

commit e3e1c08e513823bba2758da70f32079a3aa0d318
Author: coderzc <[email protected]>
AuthorDate: Sun Apr 23 23:53:43 2023 +0800

    Revert "[improve][broker] Optimization protobuf code in the bucket delayed 
tracker (#20158)"
    
    This reverts commit 3da39b2e1553cecbc6d6b85e8bc7844f611d5637.
---
 pulsar-broker/pom.xml                              |  2 -
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 24 +++---
 .../pulsar/broker/delayed/bucket/Bucket.java       |  7 +-
 .../bucket/BucketDelayedDeliveryTracker.java       | 13 +--
 .../delayed/bucket/BucketSnapshotStorage.java      |  4 +-
 .../bucket/CombinedSegmentDelayedIndexQueue.java   | 22 ++---
 .../broker/delayed/bucket/DelayedIndexQueue.java   | 12 +--
 .../broker/delayed/bucket/ImmutableBucket.java     | 19 +++--
 .../broker/delayed/bucket/MutableBucket.java       | 29 +++----
 .../TripleLongPriorityDelayedIndexQueue.java       | 25 ++----
 .../proto/DelayedMessageIndexBucketSegment.proto   | 35 --------
 ... DelayedMessageIndexBucketSnapshotFormat.proto} | 11 ++-
 .../BookkeeperBucketSnapshotStorageTest.java       | 95 +++++++++++-----------
 .../broker/delayed/MockBucketSnapshotStorage.java  | 14 ++--
 .../delayed/bucket/DelayedIndexQueueTest.java      | 70 +++++++++-------
 15 files changed, 172 insertions(+), 210 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 25b31b4f2b7..31b335e1aea 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -566,7 +566,6 @@
           <excludes>
             <exclude>**/ResourceUsage.proto</exclude>
             <exclude>**/TransactionPendingAck.proto</exclude>
-            <exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
           </excludes>
         </configuration>
         <executions>
@@ -611,7 +610,6 @@
           <sources>
             
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
             
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
-            
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
           </sources>
           
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
           
<targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 040bbbc586f..18a4c322f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.buffer.ByteBuf;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -126,8 +126,7 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
     private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry 
ledgerEntry) {
         try {
-            ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
-            return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
+            return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
         } catch (InvalidProtocolBufferException e) {
             throw new BucketSnapshotSerializationException(e);
         }
@@ -135,14 +134,15 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
     private List<SnapshotSegment> 
parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
         List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
-        while (entryEnumeration.hasMoreElements()) {
-            LedgerEntry ledgerEntry = entryEnumeration.nextElement();
-            SnapshotSegment snapshotSegment = new SnapshotSegment();
-            ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
-            snapshotSegment.parseFrom(entryBuffer, 
entryBuffer.readableBytes());
-            snapshotMetadataList.add(snapshotSegment);
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            return snapshotMetadataList;
+        } catch (IOException e) {
+            throw new BucketSnapshotSerializationException(e);
         }
-        return snapshotMetadataList;
     }
 
     @NotNull
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index a1693b1553d..4d7d3aa512b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -31,8 +31,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.roaringbitmap.RoaringBitmap;
@@ -133,8 +132,8 @@ abstract class Bucket {
     }
 
     CompletableFuture<Long> asyncSaveBucketSnapshot(
-            ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
-            List<SnapshotSegment> bucketSnapshotSegments) {
+            ImmutableBucket bucket, 
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
+            List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments) {
         final String bucketKey = bucket.bucketKey();
         final String cursorName = Codec.decode(cursor.getName());
         final String topicName = dispatcherName.substring(0, 
dispatcherName.lastIndexOf(" / " + cursorName));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index c90064c9137..b17387e276e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -53,8 +53,8 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -286,7 +286,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     // Put indexes back into the shared queue and downgrade to 
memory mode
                     synchronized (BucketDelayedDeliveryTracker.this) {
                         
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
-                            for (SnapshotSegment snapshotSegment : 
snapshotSegments) {
+                            for 
(DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
+                                    snapshotSegments) {
                                 for (DelayedIndex delayedIndex : 
snapshotSegment.getIndexesList()) {
                                     
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
                                             delayedIndex.getLedgerId(), 
delayedIndex.getEntryId());
@@ -449,7 +450,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 return FutureUtil.failedFuture(new RuntimeException("Can't 
merge buckets due to bucket create failed"));
             }
 
-            List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
+            
List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>>
 getRemainFutures =
                     
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
 
             return FutureUtil.waitForAll(getRemainFutures)
@@ -600,11 +601,11 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                             bucket.asyncDeleteBucketSnapshot(stats);
                             return;
                         }
-                        DelayedIndex
+                        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
                                 lastDelayedIndex = 
indexList.get(indexList.size() - 1);
                         
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
                                 lastDelayedIndex.getEntryId(), bucket);
-                        for (DelayedIndex index : indexList) {
+                        for 
(DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
                             
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
                                     index.getEntryId());
                         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index 7464ef9cd3f..51c89bed47a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
 
 public interface BucketSnapshotStorage {
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
index 006938e9ed2..5655a268782 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -24,8 +24,8 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import javax.annotation.concurrent.NotThreadSafe;
 import lombok.AllArgsConstructor;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
 
 @NotThreadSafe
 class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
@@ -40,8 +40,8 @@ class CombinedSegmentDelayedIndexQueue implements 
DelayedIndexQueue {
     }
 
     private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) -> 
DelayedIndexQueue.COMPARATOR.compare(
-            
node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
-            
node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));
+            
node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
+            
node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
 
     private final PriorityQueue<Node> kpq;
 
@@ -77,7 +77,7 @@ class CombinedSegmentDelayedIndexQueue implements 
DelayedIndexQueue {
         Objects.requireNonNull(node);
 
         SnapshotSegment snapshotSegment = 
node.segmentList.get(node.segmentListCursor);
-        DelayedIndex delayedIndex = 
snapshotSegment.getIndexeAt(node.segmentCursor);
+        DelayedIndex delayedIndex = 
snapshotSegment.getIndexes(node.segmentCursor);
         if (!needAdvanceCursor) {
             return delayedIndex;
         }
@@ -104,16 +104,4 @@ class CombinedSegmentDelayedIndexQueue implements 
DelayedIndexQueue {
 
         return delayedIndex;
     }
-
-    @Override
-    public void popToObject(DelayedIndex delayedIndex) {
-        DelayedIndex value = getValue(true);
-        delayedIndex.copyFrom(value);
-    }
-
-    @Override
-    public long peekTimestamp() {
-        DelayedIndex value = getValue(false);
-        return value.getTimestamp();
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
index f1209a3137a..dee476c376e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
@@ -20,10 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import java.util.Comparator;
 import java.util.Objects;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
 
 interface DelayedIndexQueue {
-    Comparator<DelayedIndex> COMPARATOR = (o1, o2) ->  {
+    Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> 
COMPARATOR = (o1, o2) ->  {
         if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
             return Long.compare(o1.getTimestamp(), o2.getTimestamp());
         } else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
@@ -35,11 +35,7 @@ interface DelayedIndexQueue {
 
     boolean isEmpty();
 
-    DelayedIndex peek();
+    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
 
-    DelayedIndex pop();
-
-    void popToObject(DelayedIndex delayedIndex);
-
-    long peekTimestamp();
+    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 0932f51f350..57de5c84fcd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -32,9 +32,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
@@ -43,7 +43,7 @@ import org.roaringbitmap.RoaringBitmap;
 class ImmutableBucket extends Bucket {
 
     @Setter
-    private List<SnapshotSegment> snapshotSegments;
+    private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
snapshotSegments;
 
     boolean merging = false;
 
@@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
         super(dispatcherName, cursor, sequencer, storage, startLedgerId, 
endLedgerId);
     }
 
-    public Optional<List<SnapshotSegment>> getSnapshotSegments() {
+    public 
Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> 
getSnapshotSegments() {
         return Optional.ofNullable(snapshotSegments);
     }
 
@@ -84,7 +84,7 @@ class ImmutableBucket extends Bucket {
                         }
                     }), BucketSnapshotPersistenceException.class, 
MaxRetryTimes)
                     .thenApply(snapshotMetadata -> {
-                        List<SnapshotSegmentMetadata> metadataList =
+                        
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> 
metadataList =
                                 snapshotMetadata.getMetadataListList();
 
                         // Skip all already reach schedule time snapshot 
segments
@@ -125,9 +125,10 @@ class ImmutableBucket extends Bucket {
                             return Collections.emptyList();
                         }
 
-                        SnapshotSegment snapshotSegment =
+                        
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
                                 bucketSnapshotSegments.get(0);
-                        List<DelayedIndex> indexList = 
snapshotSegment.getIndexesList();
+                        
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
+                                snapshotSegment.getIndexesList();
                         this.setCurrentSegmentEntryId(nextSegmentEntryId);
                         if (isRecover) {
                             this.asyncUpdateSnapshotLength();
@@ -170,7 +171,7 @@ class ImmutableBucket extends Bucket {
         setNumberBucketDelayedMessages(numberMessages.getValue());
     }
 
-    CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
+    
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 getRemainSnapshotSegment() {
         int nextSegmentEntryId = currentSegmentEntryId + 1;
         if (nextSegmentEntryId > lastSegmentEntryId) {
             return CompletableFuture.completedFuture(Collections.emptyList());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index b7e9e68f1bd..f404d5d02c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -30,10 +30,10 @@ import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 import org.roaringbitmap.RoaringBitmap;
@@ -75,16 +75,14 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
         Map<Long, RoaringBitmap> bitMap = new HashMap<>();
-        SnapshotSegment snapshotSegment = new SnapshotSegment();
+        SnapshotSegment.Builder snapshotSegmentBuilder = 
SnapshotSegment.newBuilder();
         SnapshotSegmentMetadata.Builder segmentMetadataBuilder = 
SnapshotSegmentMetadata.newBuilder();
 
         List<Long> firstScheduleTimestamps = new ArrayList<>();
         long currentTimestampUpperLimit = 0;
         long currentFirstTimestamp = 0L;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex delayedIndex = snapshotSegment.addIndexe();
-            delayedIndexQueue.popToObject(delayedIndex);
-
+            DelayedIndex delayedIndex = delayedIndexQueue.peek();
             long timestamp = delayedIndex.getTimestamp();
             if (currentTimestampUpperLimit == 0) {
                 currentFirstTimestamp = timestamp;
@@ -102,13 +100,16 @@ class MutableBucket extends Bucket implements 
AutoCloseable {
                 sharedQueue.add(timestamp, ledgerId, entryId);
             }
 
+            delayedIndexQueue.pop();
             numMessages++;
 
             bitMap.computeIfAbsent(ledgerId, k -> new 
RoaringBitmap()).add(entryId, entryId + 1);
 
-            if (delayedIndexQueue.isEmpty() || 
delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
+            snapshotSegmentBuilder.addIndexes(delayedIndex);
+
+            if (delayedIndexQueue.isEmpty() || 
delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
                     || (maxIndexesPerBucketSnapshotSegment != -1
-                    && snapshotSegment.getIndexesCount() >= 
maxIndexesPerBucketSnapshotSegment)) {
+                    && snapshotSegmentBuilder.getIndexesCount() >= 
maxIndexesPerBucketSnapshotSegment)) {
                 segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
                 
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
                 currentTimestampUpperLimit = 0;
@@ -128,8 +129,8 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
                 segmentMetadataList.add(segmentMetadataBuilder.build());
                 segmentMetadataBuilder.clear();
 
-                bucketSnapshotSegments.add(snapshotSegment);
-                snapshotSegment = new SnapshotSegment();
+                bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
+                snapshotSegmentBuilder.clear();
             }
         }
 
@@ -152,8 +153,8 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
 
         // Add the first snapshot segment last message to 
snapshotSegmentLastMessageTable
         checkArgument(!bucketSnapshotSegments.isEmpty());
-        SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
-        DelayedIndex lastDelayedIndex = 
firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1);
+        SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
+        DelayedIndex lastDelayedIndex = 
snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
         Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, 
lastDelayedIndex);
 
         CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
index 4faee3b17f1..b8d54bd78b4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 
 @NotThreadSafe
@@ -41,28 +41,17 @@ class TripleLongPriorityDelayedIndexQueue implements 
DelayedIndexQueue {
     }
 
     @Override
-    public DelayedIndex peek() {
-        DelayedIndex delayedIndex = new 
DelayedIndex().setTimestamp(queue.peekN1())
-                .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
+    public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+                
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
+                        
.setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
         return delayedIndex;
     }
 
     @Override
-    public DelayedIndex pop() {
-        DelayedIndex peek = peek();
+    public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
         queue.pop();
         return peek;
     }
-
-    @Override
-    public void popToObject(DelayedIndex delayedIndex) {
-        delayedIndex.setTimestamp(queue.peekN1())
-                .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
-        queue.pop();
-    }
-
-    @Override
-    public long peekTimestamp() {
-        return queue.peekN1();
-    }
 }
diff --git 
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto 
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
deleted file mode 100644
index 633d6a8f161..00000000000
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-syntax = "proto2";
-
-package pulsar.delay;
-option java_package = "org.apache.pulsar.broker.delayed.proto";
-option optimize_for = SPEED;
-option java_multiple_files = true;
-
-message DelayedIndex {
-    required uint64 timestamp = 1;
-    required uint64 ledger_id = 2;
-    required uint64 entry_id = 3;
-}
-
-message SnapshotSegment {
-    repeated DelayedIndex indexes = 1;
-    map<uint64, bytes> delayed_index_bit_map = 2;
-}
diff --git 
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto 
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
similarity index 85%
rename from pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
rename to 
pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
index 01b770c567d..6996b860c52 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
@@ -21,7 +21,12 @@ syntax = "proto2";
 package pulsar.delay;
 option java_package = "org.apache.pulsar.broker.delayed.proto";
 option optimize_for = SPEED;
-option java_multiple_files = true;
+
+message DelayedIndex {
+    required uint64 timestamp = 1;
+    required uint64 ledger_id = 2;
+    required uint64 entry_id = 3;
+}
 
 message SnapshotSegmentMetadata {
     map<uint64, bytes> delayed_index_bit_map = 1;
@@ -29,6 +34,10 @@ message SnapshotSegmentMetadata {
     required uint64 min_schedule_timestamp = 3;
 }
 
+message SnapshotSegment {
+    repeated DelayedIndex indexes = 1;
+}
+
 message SnapshotMetadata {
     repeated SnapshotSegmentMetadata metadata_list = 1;
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index d26f38fa2bc..7cb6b8d5865 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -29,10 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -63,8 +60,9 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testCreateSnapshot() throws ExecutionException, 
InterruptedException {
-        SnapshotMetadata snapshotMetadata = 
SnapshotMetadata.newBuilder().build();
-        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
                         bucketSnapshotSegments, UUID.randomUUID().toString(), 
TOPIC_NAME, CURSOR_NAME);
@@ -74,23 +72,24 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testGetSnapshot() throws ExecutionException, 
InterruptedException {
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
                         .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
 
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
-        DelayedIndex delayedIndex = new 
DelayedIndex().setLedgerId(100L).setEntryId(10L)
-                        .setTimestamp(timeMillis);
-        SnapshotSegment snapshotSegment = new SnapshotSegment();
-        snapshotSegment.addIndexe().copyFrom(delayedIndex);
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+                
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+                        .setTimestamp(timeMillis).build();
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment 
snapshotSegment =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
         bucketSnapshotSegments.add(snapshotSegment);
         bucketSnapshotSegments.add(snapshotSegment);
 
@@ -100,13 +99,13 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         Long bucketId = future.get();
         Assert.assertNotNull(bucketId);
 
-        CompletableFuture<List<SnapshotSegment>> bucketSnapshotSegment =
+        
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 bucketSnapshotSegment =
                 bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3);
 
-        List<SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
snapshotSegments = bucketSnapshotSegment.get();
         Assert.assertEquals(2, snapshotSegments.size());
-        for (SnapshotSegment segment : snapshotSegments) {
-            for (DelayedIndex index : segment.getIndexesList()) {
+        for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment segment : 
snapshotSegments) {
+            for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : 
segment.getIndexesList()) {
                 Assert.assertEquals(100L, index.getLedgerId());
                 Assert.assertEquals(10L, index.getEntryId());
                 Assert.assertEquals(timeMillis, index.getTimestamp());
@@ -122,17 +121,17 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8));
         map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8));
 
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
                         .setMaxScheduleTimestamp(timeMillis)
                         .setMinScheduleTimestamp(timeMillis)
                         .putAllDelayedIndexBitMap(map).build();
 
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
 
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -140,10 +139,10 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         Long bucketId = future.get();
         Assert.assertNotNull(bucketId);
 
-        SnapshotMetadata bucketSnapshotMetadata =
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
bucketSnapshotMetadata =
                 
bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get();
 
-        SnapshotSegmentMetadata metadata =
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
metadata =
                 bucketSnapshotMetadata.getMetadataList(0);
 
         Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp());
@@ -153,9 +152,9 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testDeleteSnapshot() throws ExecutionException, 
InterruptedException {
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder().build();
-        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
                         bucketSnapshotSegments, UUID.randomUUID().toString(), 
TOPIC_NAME, CURSOR_NAME);
@@ -174,22 +173,24 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testGetBucketSnapshotLength() throws ExecutionException, 
InterruptedException {
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
                         .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
 
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
-        DelayedIndex delayedIndex = new 
DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
-        SnapshotSegment snapshotSegment = new SnapshotSegment();
-        snapshotSegment.addIndexe().copyFrom(delayedIndex);
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+                
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+                        .setTimestamp(timeMillis).build();
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment 
snapshotSegment =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
         bucketSnapshotSegments.add(snapshotSegment);
         bucketSnapshotSegments.add(snapshotSegment);
 
@@ -206,22 +207,24 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testConcurrencyGet() throws ExecutionException, 
InterruptedException {
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
                         .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
 
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
-        DelayedIndex delayedIndex = new 
DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
-        SnapshotSegment snapshotSegment = new SnapshotSegment();
-        snapshotSegment.addIndexe().copyFrom(delayedIndex);
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+                
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+                        .setTimestamp(timeMillis).build();
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment 
snapshotSegment =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
         bucketSnapshotSegments.add(snapshotSegment);
         bucketSnapshotSegments.add(snapshotSegment);
 
@@ -234,7 +237,7 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         for (int i = 0; i < 100; i++) {
             CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> 
{
-                List<SnapshotSegment> list =
+                List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
list =
                         
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
                 Assert.assertTrue(list.size() > 0);
             });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index dc1c0e09ca2..9e924bdeda3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -139,9 +139,13 @@ public class MockBucketSnapshotStorage implements 
BucketSnapshotStorage {
             long lastEntryId = Math.min(lastSegmentEntryId, 
this.bucketSnapshots.get(bucketId).size());
             for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) {
                 ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i);
-                SnapshotSegment snapshotSegment = new SnapshotSegment();
-                snapshotSegment.parseFrom(byteBuf, byteBuf.readableBytes());
-                snapshotSegments.add(snapshotSegment);
+                SnapshotSegment snapshotSegment;
+                try {
+                    snapshotSegment = 
SnapshotSegment.parseFrom(byteBuf.nioBuffer());
+                    snapshotSegments.add(snapshotSegment);
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
             }
             return snapshotSegments;
         }, executorService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
index 5dc3dcc7cb9..8f87f0d49a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,19 +35,27 @@ public class DelayedIndexQueueTest {
     @Test
     public void testCompare() {
         DelayedIndex delayedIndex =
-                new 
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+                        .build();
         DelayedIndex delayedIndex2 =
-                new 
DelayedIndex().setTimestamp(2).setLedgerId(2L).setEntryId(2L);
+                
DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L)
+                        .build();
         Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
 
         delayedIndex =
-                new 
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+                        .build();
         delayedIndex2 =
-                new 
DelayedIndex().setTimestamp(1).setLedgerId(2L).setEntryId(2L);
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L)
+                        .build();
         Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
 
-        delayedIndex = new 
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
-        delayedIndex2 = new 
DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(2L);
+        delayedIndex =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
+                        .build();
+        delayedIndex2 =
+                
DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L)
+                        .build();
         Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
     }
 
@@ -55,19 +63,21 @@ public class DelayedIndexQueueTest {
     public void testCombinedSegmentDelayedIndexQueue() {
         List<DelayedIndex> listA = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            DelayedIndex delayedIndex = new 
DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
+            DelayedIndex delayedIndex =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+                            .build();
             listA.add(delayedIndex);
         }
-        SnapshotSegment snapshotSegmentA1 = new SnapshotSegment();
-        snapshotSegmentA1.addAllIndexes(listA);
+        SnapshotSegment snapshotSegmentA1 = 
SnapshotSegment.newBuilder().addAllIndexes(listA).build();
 
         List<DelayedIndex> listA2 = new ArrayList<>();
         for (int i = 10; i < 20; i++) {
-            DelayedIndex delayedIndex = new 
DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
+            DelayedIndex delayedIndex =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
+                            .build();
             listA2.add(delayedIndex);
         }
-        SnapshotSegment snapshotSegmentA2 = new SnapshotSegment();
-        snapshotSegmentA2.addAllIndexes(listA2);
+        SnapshotSegment snapshotSegmentA2 = 
SnapshotSegment.newBuilder().addAllIndexes(listA2).build();
 
         List<SnapshotSegment> segmentListA = new ArrayList<>();
         segmentListA.add(snapshotSegmentA1);
@@ -75,32 +85,36 @@ public class DelayedIndexQueueTest {
 
         List<DelayedIndex> listB = new ArrayList<>();
         for (int i = 0; i < 9; i++) {
-            DelayedIndex delayedIndex = new 
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
+            DelayedIndex delayedIndex =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
+                            .build();
 
-            DelayedIndex delayedIndex2 = new 
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
+            DelayedIndex delayedIndex2 =
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
+                            .build();
             listB.add(delayedIndex);
             listB.add(delayedIndex2);
         }
 
-        SnapshotSegment snapshotSegmentB = new SnapshotSegment();
-        snapshotSegmentB.addAllIndexes(listB);
+        SnapshotSegment snapshotSegmentB = 
SnapshotSegment.newBuilder().addAllIndexes(listB).build();
         List<SnapshotSegment> segmentListB = new ArrayList<>();
         segmentListB.add(snapshotSegmentB);
-        segmentListB.add(new SnapshotSegment());
+        segmentListB.add(SnapshotSegment.newBuilder().build());
 
         List<DelayedIndex> listC = new ArrayList<>();
         for (int i = 10; i < 30; i+=2) {
             DelayedIndex delayedIndex =
-                    new 
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
+                            .build();
 
             DelayedIndex delayedIndex2 =
-                    new 
DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
+                    
DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
+                            .build();
             listC.add(delayedIndex);
             listC.add(delayedIndex2);
         }
 
-        SnapshotSegment snapshotSegmentC = new SnapshotSegment();
-        snapshotSegmentC.addAllIndexes(listC);
+        SnapshotSegment snapshotSegmentC = 
SnapshotSegment.newBuilder().addAllIndexes(listC).build();
         List<SnapshotSegment> segmentListC = new ArrayList<>();
         segmentListC.add(snapshotSegmentC);
 
@@ -109,14 +123,11 @@ public class DelayedIndexQueueTest {
 
         int count = 0;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex pop = new DelayedIndex();
-            delayedIndexQueue.popToObject(pop);
+            DelayedIndex pop = delayedIndexQueue.pop();
             log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(), 
pop.getEntryId());
             count++;
             if (!delayedIndexQueue.isEmpty()) {
                 DelayedIndex peek = delayedIndexQueue.peek();
-                long timestamp = delayedIndexQueue.peekTimestamp();
-                Assert.assertEquals(timestamp, peek.getTimestamp());
                 Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
             }
         }
@@ -136,13 +147,10 @@ public class DelayedIndexQueueTest {
 
         int count = 0;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex pop = new DelayedIndex();
-            delayedIndexQueue.popToObject(pop);
+            DelayedIndex pop = delayedIndexQueue.pop();
             count++;
             if (!delayedIndexQueue.isEmpty()) {
                 DelayedIndex peek = delayedIndexQueue.peek();
-                long timestamp = delayedIndexQueue.peekTimestamp();
-                Assert.assertEquals(timestamp, peek.getTimestamp());
                 Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
             }
         }

Reply via email to