This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d85a5e2  [Feature] Introduce continuous offset for pulsar (#9039)
d85a5e2 is described below

commit d85a5e23c408eb3243c434909588324455aa94a5
Author: Aloys <[email protected]>
AuthorDate: Thu Dec 24 20:52:40 2020 +0800

    [Feature] Introduce continuous offset for pulsar (#9039)
    
    Fixes #9038
    ### Motivation
    
    As described in 
[PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata).
    One of the use case for Broker entry metadata is  providing continuous 
message sequence-Id for messages in one topic-partition which is useful for 
Protocol Hanlder like KOP.
    
    This PR enable Pulsar to support continuous offset for message based on 
Broker entry metadata.
    
    ### Modifications
    
    Introduce a new field for broker entry metadta named `offset`;
    Introduce a new interceptor type `ManagedLedgerInterceptor` which intercept 
entry in `ManagedLedger`;
    Each partition will be assigned a `ManagedLedgerInterceptor` when 
`ManagedLedger` created;
    Each Entry will be intercept for adding a  monotone increasing offset in 
Broker entry metadata and the offet is added by batchSize of entry;
    Support find position by a given offset.
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  74 +++++++
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |   9 +
 .../bookkeeper/mledger/ManagedLedgerException.java |   6 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 150 +++++++++++++-
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  43 +++-
 .../bookkeeper/mledger/impl/OpFindNewest.java      |  28 ++-
 .../interceptor/ManagedLedgerInterceptor.java      |  60 ++++++
 .../intercept/ManagedLedgerInterceptorImpl.java    | 113 +++++++++++
 .../pulsar/broker/service/BrokerService.java       |  17 ++
 .../org/apache/pulsar/broker/service/Producer.java |   5 +
 .../org/apache/pulsar/broker/service/Topic.java    |   4 +
 .../broker/service/persistent/PersistentTopic.java |  26 +--
 .../intercept/MangedLedgerInterceptorImplTest.java | 220 +++++++++++++++++++++
 .../service/PersistentMessageFinderTest.java       |   9 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  57 ++++++
 .../AppendBrokerTimestampMetadataInterceptor.java  |   8 +
 ...or.java => AppendIndexMetadataInterceptor.java} |  33 +++-
 .../intercept/BrokerEntryMetadataInterceptor.java  |   2 +
 .../apache/pulsar/common/protocol/Commands.java    |  32 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 .../pulsar/common/protocol/CommandUtilsTests.java  |  16 +-
 21 files changed, 870 insertions(+), 43 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4b06b5e..6274f97 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -32,6 +32,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 
 /**
@@ -75,6 +76,18 @@ public interface ManagedLedger {
     Position addEntry(byte[] data) throws InterruptedException, 
ManagedLedgerException;
 
     /**
+     * Append a new entry to the end of a managed ledger.
+     *
+     * @param data
+     *            data entry to be persisted
+     * @param numberOfMessages
+     *            numberOfMessages of entry
+     * @return the Position at which the entry has been inserted
+     * @throws ManagedLedgerException
+     */
+    Position addEntry(byte[] data, int numberOfMessages) throws 
InterruptedException, ManagedLedgerException;
+
+    /**
      * Append a new entry asynchronously.
      *
      * @see #addEntry(byte[])
@@ -103,6 +116,22 @@ public interface ManagedLedger {
     Position addEntry(byte[] data, int offset, int length) throws 
InterruptedException, ManagedLedgerException;
 
     /**
+     * Append a new entry to the end of a managed ledger.
+     *
+     * @param data
+     *            data entry to be persisted
+     * @param numberOfMessages
+     *            numberOfMessages of entry
+     * @param offset
+     *            offset in the data array
+     * @param length
+     *            number of bytes
+     * @return the Position at which the entry has been inserted
+     * @throws ManagedLedgerException
+     */
+    Position addEntry(byte[] data, int numberOfMessages, int offset, int 
length) throws InterruptedException, ManagedLedgerException;
+
+    /**
      * Append a new entry asynchronously.
      *
      * @see #addEntry(byte[])
@@ -123,6 +152,26 @@ public interface ManagedLedger {
      * Append a new entry asynchronously.
      *
      * @see #addEntry(byte[])
+     * @param data
+     *            data entry to be persisted
+     * @param numberOfMessages
+     *            numberOfMessages of entry
+     * @param offset
+     *            offset in the data array
+     * @param length
+     *            number of bytes
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int 
length, AddEntryCallback callback, Object ctx);
+
+
+    /**
+     * Append a new entry asynchronously.
+     *
+     * @see #addEntry(byte[])
      * @param buffer
      *            buffer with the data entry
      * @param callback
@@ -133,6 +182,21 @@ public interface ManagedLedger {
     void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx);
 
     /**
+     * Append a new entry asynchronously.
+     *
+     * @see #addEntry(byte[])
+     * @param buffer
+     *            buffer with the data entry
+     * @param numberOfMessages
+     *            numberOfMessages for data entry
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     */
+    void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback 
callback, Object ctx);
+
+    /**
      * Open a ManagedCursor in this ManagedLedger.
      *
      * <p/>If the cursors doesn't exist, a new one will be created and its 
position will be at the end of the
@@ -520,4 +584,14 @@ public interface ManagedLedger {
      * Roll current ledger if it is full
      */
     void rollCurrentLedgerIfFull();
+
+    /**
+     * Find position by sequenceId.
+     * */
+    CompletableFuture<Position> 
asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);
+
+    /**
+     * Get the ManagedLedgerInterceptor for ManagedLedger.
+     * */
+    ManagedLedgerInterceptor getManagedLedgerInterceptor();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index b1f2512..7f982b1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -33,6 +33,7 @@ import 
org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
 import 
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
 
 /**
@@ -75,6 +76,7 @@ public class ManagedLedgerConfig {
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
+    private ManagedLedgerInterceptor managedLedgerInterceptor;
 
     public boolean isCreateIfMissing() {
         return createIfMissing;
@@ -637,4 +639,11 @@ public class ManagedLedgerConfig {
         this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
     }
 
+    public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+        return managedLedgerInterceptor;
+    }
+
+    public void setManagedLedgerInterceptor(ManagedLedgerInterceptor 
managedLedgerInterceptor) {
+        this.managedLedgerInterceptor = managedLedgerInterceptor;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 14202cb..0f56b99 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -157,6 +157,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class ManagedLedgerInterceptException extends 
ManagedLedgerException {
+        public ManagedLedgerInterceptException(String msg) {
+            super(msg);
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ba50d2b..562a331 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -64,6 +64,8 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+
+import io.netty.util.ReferenceCountUtil;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -77,6 +79,7 @@ import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -98,6 +101,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundExcept
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -108,6 +112,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.mledger.offload.OffloadUtils;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -195,6 +200,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     volatile PositionImpl lastConfirmedEntry;
 
+    private ManagedLedgerInterceptor managedLedgerInterceptor;
+
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
 
@@ -283,6 +290,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() 
* (1 + random.nextDouble() * 5 / 100.0));
         this.mlOwnershipChecker = mlOwnershipChecker;
         this.propertiesMap = Maps.newHashMap();
+        if (config.getManagedLedgerInterceptor() != null) {
+            this.managedLedgerInterceptor = 
config.getManagedLedgerInterceptor();
+        }
     }
 
     synchronized void initialize(final ManagedLedgerInitializeLedgerCallback 
callback, final Object ctx) {
@@ -310,6 +320,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         propertiesMap.put(property.getKey(), 
property.getValue());
                     }
                 }
+                if (managedLedgerInterceptor != null) {
+                    
managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap);
+                }
 
                 // Last ledger stat may be zeroed, we must update it
                 if (ledgers.size() > 0) {
@@ -325,6 +338,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                         .setEntries(lh.getLastAddConfirmed() + 
1).setSize(lh.getLength())
                                         .setTimestamp(clock.millis()).build();
                                 ledgers.put(id, info);
+                                if (managedLedgerInterceptor != null) {
+                                    
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh);
+                                }
                                 initializeBookKeeper(callback);
                             } else if (isNoSuchLedgerExistsException(rc)) {
                                 log.warn("[{}] Ledger not found: {}", name, 
ledgers.lastKey());
@@ -551,6 +567,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     @Override
+    public Position addEntry(byte[] data, int numberOfMessages) throws 
InterruptedException, ManagedLedgerException {
+        return addEntry(data, numberOfMessages, 0, data.length);
+    }
+
+    @Override
     public Position addEntry(byte[] data, int offset, int length) throws 
InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         // Result list will contain the status exception and the resulting
@@ -586,6 +607,41 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     @Override
+    public Position addEntry(byte[] data, int numberOfMessages, int offset, 
int length) throws InterruptedException, ManagedLedgerException {
+        final CountDownLatch counter = new CountDownLatch(1);
+        // Result list will contain the status exception and the resulting
+        // position
+        class Result {
+            ManagedLedgerException status = null;
+            Position position = null;
+        }
+        final Result result = new Result();
+
+        asyncAddEntry(data, numberOfMessages, offset, length, new 
AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, Object ctx) {
+                result.position = position;
+                counter.countDown();
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                result.status = exception;
+                counter.countDown();
+            }
+        }, null);
+
+        counter.await();
+
+        if (result.status != null) {
+            log.error("[{}] Error adding entry", name, result.status);
+            throw result.status;
+        }
+
+        return result.position;
+    }
+
+    @Override
     public void asyncAddEntry(final byte[] data, final AddEntryCallback 
callback, final Object ctx) {
         asyncAddEntry(data, 0, data.length, callback, ctx);
     }
@@ -598,6 +654,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     @Override
+    public void asyncAddEntry(final byte[] data, int numberOfMessages, int 
offset, int length, final AddEntryCallback callback,
+                              final Object ctx) {
+        ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length);
+        asyncAddEntry(buffer, numberOfMessages, callback, ctx);
+    }
+
+    @Override
     public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, 
Object ctx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
@@ -609,6 +672,18 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         executor.executeOrdered(name, safeRun(() -> 
internalAsyncAddEntry(addOperation)));
     }
 
+    @Override
+    public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, 
AddEntryCallback callback, Object ctx) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
+        }
+
+        OpAddEntry addOperation = OpAddEntry.create(this, buffer, 
numberOfMessages, callback, ctx);
+
+        // Jump to specific thread to avoid contention from writers writing 
from different threads
+        executor.executeOrdered(name, safeRun(() -> 
internalAsyncAddEntry(addOperation)));
+    }
+
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
         pendingAddEntries.add(addOperation);
         final State state = STATE_UPDATER.get(this);
@@ -672,8 +747,27 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 addOperation.setCloseWhenDone(true);
                 STATE_UPDATER.set(this, State.ClosingLedger);
             }
+            // interceptor entry before add to bookie
+            if (beforeAddEntry(addOperation)) {
+                addOperation.initiate();
+            }
+        }
+    }
 
-            addOperation.initiate();
+    private boolean beforeAddEntry(OpAddEntry addOperation) {
+        // if no interceptor, just return true to make sure addOperation will 
be initiate()
+        if (managedLedgerInterceptor == null) {
+            return true;
+        }
+        try {
+            managedLedgerInterceptor.beforeAddEntry(addOperation, 
addOperation.getNumberOfMessages());
+            return true;
+        } catch (Exception e) {
+            addOperation.failed(
+                    new ManagedLedgerInterceptException("Interceptor managed 
ledger before add to bookie failed."));
+            ReferenceCountUtil.release(addOperation.data);
+            log.error("[{}] Failed to intercept adding an entry to bookie.", 
name, e);
+            return false;
         }
     }
 
@@ -1357,10 +1451,12 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 // If op is used by another ledger handle, we need to close it 
and create a new one
                 if (existsOp.ledger != null) {
                     existsOp.close();
-                    existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, 
existsOp.callback, existsOp.ctx);
+                    existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, 
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
                 }
                 existsOp.setLedger(currentLedger);
-                pendingAddEntries.add(existsOp);
+                if (beforeAddEntry(existsOp)) {
+                    pendingAddEntries.add(existsOp);
+                }
             }
         } while (existsOp != null && --pendingSize > 0);
 
@@ -1470,6 +1566,51 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> 
asyncFindPosition(com.google.common.base.Predicate<Entry> predicate) {
+
+        CompletableFuture<Position> future = new CompletableFuture();
+        Long firstLedgerId = ledgers.firstKey();
+        final PositionImpl startPosition = firstLedgerId == null ? null : new 
PositionImpl(firstLedgerId, 0);
+        if (startPosition == null) {
+            future.complete(null);
+            return future;
+        }
+        AsyncCallbacks.FindEntryCallback findEntryCallback = new 
AsyncCallbacks.FindEntryCallback() {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                final Position finalPosition;
+                if (position == null) {
+                    finalPosition = startPosition;
+                    if (finalPosition == null) {
+                        log.warn("[{}] Unable to find position for predicate 
{}.", name, predicate);
+                        future.complete(null);
+                        return;
+                    }
+                    log.info("[{}] Unable to find position for predicate {}. 
Use the first position {} instead.", name, predicate, startPosition);
+                } else {
+                    finalPosition = getNextValidPosition((PositionImpl) 
position);
+                }
+                future.complete((PositionImpl) finalPosition);
+            }
+
+            @Override
+            public void findEntryFailed(ManagedLedgerException exception, 
Optional<Position> failedReadPosition, Object ctx) {
+                log.warn("[{}] Unable to find position for predicate {}.", 
name, predicate);
+                future.complete(null);
+            }
+        };
+        long max = getNumberOfEntries() - 1;
+        OpFindNewest op = new OpFindNewest(this, startPosition, predicate, 
max, findEntryCallback, null);
+        op.find();
+        return future;
+    }
+
+    @Override
+    public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+        return managedLedgerInterceptor;
+    }
+
     void clearPendingAddEntries(ManagedLedgerException e) {
         while (!pendingAddEntries.isEmpty()) {
             OpAddEntry op = pendingAddEntries.poll();
@@ -3085,6 +3226,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
                     .setEntryId(lastConfirmedEntry.getEntryId()));
         }
+        if (managedLedgerInterceptor != null) {
+            managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap);
+        }
         for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
             mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
                     .setKey(property.getKey()).setValue(property.getValue()));
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 08d188f..fa5228c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -44,10 +44,11 @@ import org.slf4j.LoggerFactory;
  * Handles the life-cycle of an addEntry() operation.
  *
  */
-class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
+public class OpAddEntry extends SafeRunnable implements AddCallback, 
CloseCallback {
     protected ManagedLedgerImpl ml;
     LedgerHandle ledger;
     private long entryId;
+    private int numberOfMessages;
 
     @SuppressWarnings("unused")
     private static final AtomicReferenceFieldUpdater<OpAddEntry, 
AddEntryCallback> callbackUpdater =
@@ -95,6 +96,27 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
         return op;
     }
 
+    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int 
numberOfMessages, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = RECYCLER.get();
+        op.ml = ml;
+        op.ledger = null;
+        op.numberOfMessages = numberOfMessages;
+        op.data = data.retain();
+        op.dataLength = data.readableBytes();
+        op.callback = callback;
+        op.ctx = ctx;
+        op.addOpCount = 
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
+        op.closeWhenDone = false;
+        op.entryId = -1;
+        op.startTime = System.nanoTime();
+        op.state = State.OPEN;
+        ml.mbean.addAddEntrySample(op.dataLength);
+        if (log.isDebugEnabled()) {
+            log.debug("Created new OpAddEntry {}", op);
+        }
+        return op;
+    }
+
     public void setLedger(LedgerHandle ledger) {
         this.ledger = ledger;
     }
@@ -272,7 +294,23 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
     public State getState() {
         return state;
     }
-    
+
+    public ByteBuf getData() {
+        return data;
+    }
+
+    public int getNumberOfMessages() {
+        return numberOfMessages;
+    }
+
+    public void setNumberOfMessages(int numberOfMessages) {
+        this.numberOfMessages = numberOfMessages;
+    }
+
+    public void setData(ByteBuf data) {
+        this.data = data;
+    }
+
     private final Handle<OpAddEntry> recyclerHandle;
 
     private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
@@ -290,6 +328,7 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
         ml = null;
         ledger = null;
         data = null;
+        numberOfMessages = 0;
         dataLength = -1;
         callback = null;
         ctx = null;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 861d247..cbecedd 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -31,6 +31,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
 
 class OpFindNewest implements ReadEntryCallback {
     private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
     private final PositionImpl startPosition;
     private final FindEntryCallback callback;
     private final Predicate<Entry> condition;
@@ -49,6 +50,23 @@ class OpFindNewest implements ReadEntryCallback {
     public OpFindNewest(ManagedCursorImpl cursor, PositionImpl startPosition, 
Predicate<Entry> condition,
             long numberOfEntries, FindEntryCallback callback, Object ctx) {
         this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+
+        this.min = 0;
+        this.max = numberOfEntries;
+
+        this.searchPosition = startPosition;
+        this.state = State.checkFirst;
+    }
+
+    public OpFindNewest(ManagedLedgerImpl ledger, PositionImpl startPosition, 
Predicate<Entry> condition,
+                        long numberOfEntries, FindEntryCallback callback, 
Object ctx) {
+        this.cursor = null;
+        this.ledger = ledger;
         this.startPosition = startPosition;
         this.callback = callback;
         this.condition = condition;
@@ -77,7 +95,7 @@ class OpFindNewest implements ReadEntryCallback {
 
                 // check last entry
                 state = State.checkLast;
-                searchPosition = 
cursor.ledger.getPositionAfterN(searchPosition, max, 
PositionBound.startExcluded);
+                searchPosition = ledger.getPositionAfterN(searchPosition, max, 
PositionBound.startExcluded);
                 find();
             }
             break;
@@ -88,7 +106,7 @@ class OpFindNewest implements ReadEntryCallback {
             } else {
                 // start binary search
                 state = State.searching;
-                searchPosition = 
cursor.ledger.getPositionAfterN(startPosition, mid(), 
PositionBound.startExcluded);
+                searchPosition = ledger.getPositionAfterN(startPosition, 
mid(), PositionBound.startExcluded);
                 find();
             }
             break;
@@ -106,7 +124,7 @@ class OpFindNewest implements ReadEntryCallback {
                 callback.findEntryComplete(lastMatchedPosition, 
OpFindNewest.this.ctx);
                 return;
             }
-            searchPosition = cursor.ledger.getPositionAfterN(startPosition, 
mid(), PositionBound.startExcluded);
+            searchPosition = ledger.getPositionAfterN(startPosition, mid(), 
PositionBound.startExcluded);
             find();
         }
     }
@@ -117,8 +135,8 @@ class OpFindNewest implements ReadEntryCallback {
     }
 
     public void find() {
-        if (cursor.hasMoreEntries(searchPosition)) {
-            cursor.ledger.asyncReadEntry(searchPosition, this, null);
+        if (cursor != null ? cursor.hasMoreEntries(searchPosition) : 
ledger.hasMoreEntries(searchPosition)) {
+            ledger.asyncReadEntry(searchPosition, this, null);
         } else {
             callback.findEntryComplete(lastMatchedPosition, 
OpFindNewest.this.ctx);
         }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java
new file mode 100644
index 0000000..f5d9f12
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.interceptor;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+
+import java.util.Map;
+
+/**
+ * Interceptor for ManagedLedger.
+ * */
[email protected]
[email protected]
+public interface ManagedLedgerInterceptor {
+
+    /**
+     * Intercept an OpAddEntry and return an OpAddEntry.
+     * @param op an OpAddEntry to be intercepted.
+     * @param batchSize
+     * @return an OpAddEntry.
+     */
+    OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize);
+
+    /**
+     * Intercept when ManagedLedger is initialized.
+     * @param propertiesMap map of properties.
+     */
+    void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap);
+
+    /**
+     * Intercept when ManagedLedger is initialized.
+     * @param name name of ManagedLedger
+     * @param ledgerHandle a LedgerHandle.
+     */
+    void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle 
ledgerHandle);
+
+    /**
+     * @param propertiesMap  map of properties.
+     */
+    void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
new file mode 100644
index 0000000..0e59d2f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
+import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
+import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
+    private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
+    private static final String INDEX = "index";
+
+
+    private final Set<BrokerEntryMetadataInterceptor> 
brokerEntryMetadataInterceptors;
+
+
+    public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> 
brokerEntryMetadataInterceptors) {
+        this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
+    }
+
+    public long getIndex() {
+        long index = -1;
+        for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
+            if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                index = ((AppendIndexMetadataInterceptor) 
interceptor).getIndex();
+            }
+        }
+        return index;
+    }
+
+    @Override
+    public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) {
+       if (op == null || batchSize <= 0) {
+           return op;
+       }
+        op.setData(Commands.addBrokerEntryMetadata(op.getData(), 
brokerEntryMetadataInterceptors, batchSize));
+        return op;
+    }
+
+    @Override
+    public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
+
+        if (propertiesMap.containsKey(INDEX)) {
+            for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
+                if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                  ((AppendIndexMetadataInterceptor) interceptor)
+                          
.recoveryIndexGenerator(Long.parseLong(propertiesMap.get(INDEX)));
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle 
lh) {
+        try {
+            for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
+                if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                    LedgerEntries ledgerEntries =
+                            lh.read(lh.getLastAddConfirmed() - 1, 
lh.getLastAddConfirmed());
+                    for (LedgerEntry entry : ledgerEntries) {
+                        PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
+                                
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
+                        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
+                            ((AppendIndexMetadataInterceptor) interceptor)
+                                    
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                        }
+                    }
+
+                }
+            }
+        } catch (org.apache.bookkeeper.client.api.BKException | 
InterruptedException e) {
+            log.error("[{}] Read last entry error.", name, e);
+        }
+    }
+
+    @Override
+    public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
+        for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
+            if (interceptor instanceof AppendIndexMetadataInterceptor) {
+                propertiesMap.put(INDEX, 
String.valueOf(((AppendIndexMetadataInterceptor) interceptor).getIndex()));
+            }
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6cfdf38..792c676 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -98,6 +99,7 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
@@ -126,6 +128,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.FieldContext;
+import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1092,6 +1095,20 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         }
 
         getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> {
+            if (isBrokerEntryMetadataEnabled()) {
+                // init managedLedger interceptor
+                for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
+                    if (interceptor instanceof AppendIndexMetadataInterceptor) 
{
+                        // add individual AppendOffsetMetadataInterceptor for 
each topic
+                        brokerEntryMetadataInterceptors.remove(interceptor);
+                        brokerEntryMetadataInterceptors.add(new 
AppendIndexMetadataInterceptor());
+                    }
+                }
+                ManagedLedgerInterceptor mlInterceptor =
+                        new 
ManagedLedgerInterceptorImpl(brokerEntryMetadataInterceptors);
+                managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor);
+            }
+
             managedLedgerConfig.setCreateIfMissing(createIfMissing);
 
             // Once we have the configuration, we can proceed with the async 
open operation
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index c69f9ba..2bca232 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -463,6 +463,11 @@ public class Producer {
             return callback;
         }
 
+        @Override
+        public long getNumberOfMessages() {
+            return batchSize;
+        }
+
         private final Handle<MessagePublishContext> recyclerHandle;
 
         private MessagePublishContext(Handle<MessagePublishContext> 
recyclerHandle) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index af88ce8..c940195 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -90,6 +90,10 @@ public interface Topic {
         default long getOriginalHighestSequenceId() {
             return  -1L;
         }
+
+        default long getNumberOfMessages() {
+            return  1L;
+        }
     }
 
     void publishMessage(ByteBuf headersAndPayload, PublishContext callback);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 676f98c..49e9042 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -357,10 +357,7 @@ public class PersistentTopic extends AbstractTopic
                 messageDeduplication.isDuplicate(publishContext, 
headersAndPayload);
         switch (status) {
             case NotDup:
-                // intercept headersAndPayload and add entry metadata
-                if (appendBrokerEntryMetadata(headersAndPayload, 
publishContext)) {
-                    ledger.asyncAddEntry(headersAndPayload, this, 
publishContext);
-                }
+                asyncAddEntry(headersAndPayload, publishContext);
                 break;
             case Dup:
                 // Immediately acknowledge duplicated message
@@ -374,22 +371,13 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
-    private boolean appendBrokerEntryMetadata(ByteBuf headersAndPayload, 
PublishContext publishContext) {
-        // just return true if BrokerEntryMetadata is not enabled
-        if (!brokerService.isBrokerEntryMetadataEnabled()) {
-            return true;
-        }
-
-        try {
-            headersAndPayload =  
Commands.addBrokerEntryMetadata(headersAndPayload,
-                    brokerService.getBrokerEntryMetadataInterceptors());
-        } catch (Exception e) {
-            decrementPendingWriteOpsAndCheck();
-            publishContext.completed(new 
BrokerServiceException.AddEntryMetadataException(e), -1, -1);
-            log.error("[{}] Failed to add broker entry metadata.", topic, e);
-            return false;
+    private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext 
publishContext) {
+        if (brokerService.isBrokerEntryMetadataEnabled()) {
+            ledger.asyncAddEntry(headersAndPayload,
+                    (int) publishContext.getNumberOfMessages(), this, 
publishContext);
+        } else {
+            ledger.asyncAddEntry(headersAndPayload, this, publishContext);
         }
-        return true;
     }
 
     public void asyncReadEntry(PositionImpl position, 
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
new file mode 100644
index 0000000..2c86b98
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
+import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
+import org.apache.pulsar.common.protocol.Commands;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
com.sun.org.apache.xml.internal.serialize.OutputFormat.Defaults.Encoding;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class MangedLedgerInterceptorImplTest  extends MockedBookKeeperTestCase 
{
+    private static final Logger log = 
LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
+
+
+    @Test
+    public void testAddBrokerEntryMetadata() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        int numberOfEntries = 10;
+        final String ledgerAndCursorName = "topicEntryMetadataSequenceId";
+
+        ManagedLedgerInterceptor interceptor = new 
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setManagedLedgerInterceptor(interceptor);
+
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+
+        for ( int i = 0 ; i < numberOfEntries; i ++) {
+            ledger.addEntry(("message" + i).getBytes(), MOCK_BATCH_SIZE);
+        }
+
+
+        assertEquals(19, ((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex());
+        List<Entry> entryList = cursor.readEntries(numberOfEntries);
+        for (int i = 0 ; i < numberOfEntries; i ++) {
+            PulsarApi.BrokerEntryMetadata metadata =
+                    
Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer());
+            assertNotNull(metadata);
+            assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1);
+        }
+
+        cursor.close();;
+        ledger.close();
+        factory.shutdown();
+    }
+
+
+    @Test(timeOut = 20000)
+    public void testRecoveryIndex() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        ManagedLedgerInterceptor interceptor = new 
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setManagedLedgerInterceptor(interceptor);
+        ManagedLedger ledger = factory.open("my_recovery_index_test_ledger", 
config);
+
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding), MOCK_BATCH_SIZE);
+
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding), MOCK_BATCH_SIZE);
+
+        assertEquals(((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1);
+
+        ledger.close();
+
+        log.info("Closing ledger and reopening");
+
+        // / Reopen the same managed-ledger
+        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle());
+        ledger = factory2.open("my_recovery_index_test_ledger", config);
+
+        cursor = ledger.openCursor("c1");
+
+        assertEquals(ledger.getNumberOfEntries(), 2);
+        assertEquals(((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1);
+
+
+        List<Entry> entries = cursor.readEntries(100);
+        assertEquals(entries.size(), 1);
+        entries.forEach(e -> e.release());
+
+        cursor.close();
+        ledger.close();
+        factory2.shutdown();
+    }
+
+    @Test
+    public void testFindPositionByIndex() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        final int maxEntriesPerLedger = 5;
+        int maxSequenceIdPerLedger = MOCK_BATCH_SIZE * maxEntriesPerLedger;
+        ManagedLedgerInterceptor interceptor = new 
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setManagedLedgerInterceptor(interceptor);
+        managedLedgerConfig.setMaxEntriesPerLedger(5);
+
+        ManagedLedger ledger = 
factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig);
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        long firstLedgerId = -1;
+        for (int i = 0; i < maxEntriesPerLedger; i++) {
+            firstLedgerId = ((PositionImpl) 
ledger.addEntry("dummy-entry".getBytes(Encoding), 
MOCK_BATCH_SIZE)).getLedgerId();
+        }
+
+        assertEquals(((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(), 9);
+
+
+        PositionImpl position = null;
+        for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(); index ++) {
+            position = (PositionImpl) ledger.asyncFindPosition(new 
IndexSearchPredicate(index)).get();
+            assertEquals(position.getEntryId(), (index % 
maxSequenceIdPerLedger) / MOCK_BATCH_SIZE);
+        }
+
+        // roll over ledger
+        long secondLedgerId = -1;
+        for (int i = 0; i < maxEntriesPerLedger; i++) {
+            secondLedgerId = ((PositionImpl) 
ledger.addEntry("dummy-entry".getBytes(Encoding), 
MOCK_BATCH_SIZE)).getLedgerId();
+        }
+        assertEquals(((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(), 19);
+        assertNotEquals(firstLedgerId, secondLedgerId);
+
+        for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(); index ++) {
+            position = (PositionImpl) ledger.asyncFindPosition(new 
IndexSearchPredicate(index)).get();
+            assertEquals(position.getEntryId(), (index % 
maxSequenceIdPerLedger) / MOCK_BATCH_SIZE);
+        }
+
+        // reopen ledger
+        ledger.close();
+        // / Reopen the same managed-ledger
+        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle());
+        ledger = factory2.open("my_ml_broker_entry_metadata_test_ledger", 
managedLedgerConfig);
+
+        long thirdLedgerId = -1;
+        for (int i = 0; i < maxEntriesPerLedger; i++) {
+            thirdLedgerId = ((PositionImpl) 
ledger.addEntry("dummy-entry".getBytes(Encoding), 
MOCK_BATCH_SIZE)).getLedgerId();
+        }
+        assertEquals(((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(), 29);
+        assertNotEquals(secondLedgerId, thirdLedgerId);
+
+        for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) 
ledger.getManagedLedgerInterceptor()).getIndex(); index ++) {
+            position = (PositionImpl) ledger.asyncFindPosition(new 
IndexSearchPredicate(index)).get();
+            assertEquals(position.getEntryId(), (index % 
maxSequenceIdPerLedger) / MOCK_BATCH_SIZE);
+        }
+        cursor.close();
+        ledger.close();
+        factory2.shutdown();
+    }
+
+    public static Set<BrokerEntryMetadataInterceptor> 
getBrokerEntryMetadataInterceptors() {
+        Set<String> interceptorNames = new HashSet<>();
+        
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
+        
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
+        return 
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
+                Thread.currentThread().getContextClassLoader());
+    }
+
+    class IndexSearchPredicate implements 
com.google.common.base.Predicate<Entry> {
+
+        long indexToSearch = -1;
+        public IndexSearchPredicate(long indexToSearch) {
+            this.indexToSearch = indexToSearch;
+        }
+
+        @Override
+        public boolean apply(@Nullable Entry entry) {
+            try {
+                PulsarApi.BrokerEntryMetadata brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+                return brokerEntryMetadata.getIndex() < indexToSearch;
+            } catch (Exception e) {
+                log.error("Error deserialize message for message position 
find", e);
+            } finally {
+                entry.release();
+            }
+            return false;
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 1740802..c0c78d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -108,7 +108,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
     public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) 
throws Exception {
         ByteBuf msgWithEntryMeta =
-                Commands.addBrokerEntryMetadata(headerAndPayloads, 
getBrokerEntryMetadataInterceptors());
+                Commands.addBrokerEntryMetadata(headerAndPayloads, 
getBrokerEntryMetadataInterceptors(), 1);
         byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
         msgWithEntryMeta.release();
         return byteMessage;
@@ -321,9 +321,10 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
     }
 
     public static Set<BrokerEntryMetadataInterceptor> 
getBrokerEntryMetadataInterceptors() {
-
-        return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(
-                
Sets.newHashSet("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"),
+        Set<String> interceptorNames = new HashSet<>();
+        
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
+        
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
+        return 
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
                 Thread.currentThread().getContextClassLoader());
     }
     /**
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index aa9f2c1..f19ba63 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -7074,6 +7074,10 @@ public final class PulsarApi {
     // optional uint64 broker_timestamp = 1;
     boolean hasBrokerTimestamp();
     long getBrokerTimestamp();
+    
+    // optional uint64 index = 2;
+    boolean hasIndex();
+    long getIndex();
   }
   public static final class BrokerEntryMetadata extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -7120,8 +7124,19 @@ public final class PulsarApi {
       return brokerTimestamp_;
     }
     
+    // optional uint64 index = 2;
+    public static final int INDEX_FIELD_NUMBER = 2;
+    private long index_;
+    public boolean hasIndex() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getIndex() {
+      return index_;
+    }
+    
     private void initFields() {
       brokerTimestamp_ = 0L;
+      index_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7143,6 +7158,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeUInt64(1, brokerTimestamp_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, index_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -7155,6 +7173,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(1, brokerTimestamp_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(2, index_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7270,6 +7292,8 @@ public final class PulsarApi {
         super.clear();
         brokerTimestamp_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000001);
+        index_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
       
@@ -7307,6 +7331,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000001;
         }
         result.brokerTimestamp_ = brokerTimestamp_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.index_ = index_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7316,6 +7344,9 @@ public final class PulsarApi {
         if (other.hasBrokerTimestamp()) {
           setBrokerTimestamp(other.getBrokerTimestamp());
         }
+        if (other.hasIndex()) {
+          setIndex(other.getIndex());
+        }
         return this;
       }
       
@@ -7350,6 +7381,11 @@ public final class PulsarApi {
               brokerTimestamp_ = input.readUInt64();
               break;
             }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              index_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -7377,6 +7413,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint64 index = 2;
+      private long index_ ;
+      public boolean hasIndex() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getIndex() {
+        return index_;
+      }
+      public Builder setIndex(long value) {
+        bitField0_ |= 0x00000002;
+        index_ = value;
+        
+        return this;
+      }
+      public Builder clearIndex() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        index_ = 0L;
+        
+        return this;
+      }
+      
       // 
@@protoc_insertion_point(builder_scope:pulsar.proto.BrokerEntryMetadata)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
index 6043f26..78cdfc8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
@@ -30,4 +30,12 @@ public class AppendBrokerTimestampMetadataInterceptor 
implements BrokerEntryMeta
     public PulsarApi.BrokerEntryMetadata.Builder 
intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) {
         return brokerMetadata.setBrokerTimestamp(System.currentTimeMillis());
     }
+
+    @Override
+    public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(
+            PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
+            int batchSize) {
+        // do nothing, just return brokerMetadata
+        return brokerMetadata;
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
similarity index 54%
copy from 
pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
index 6043f26..dba3d9a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
@@ -20,14 +20,35 @@ package org.apache.pulsar.common.intercept;
 
 import org.apache.pulsar.common.api.proto.PulsarApi;
 
-/**
- * A plugin interface that allows you to intercept the client requests to
- *  the Pulsar brokers and add timestamp from broker side metadata for each 
entry.
- */
-public class AppendBrokerTimestampMetadataInterceptor implements 
BrokerEntryMetadataInterceptor {
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AppendIndexMetadataInterceptor implements 
BrokerEntryMetadataInterceptor{
+    private final AtomicLong indexGenerator;
+
+    public AppendIndexMetadataInterceptor() {
+        this.indexGenerator = new AtomicLong(-1);
+    }
+
+    public void recoveryIndexGenerator(long index) {
+        if (indexGenerator.get() < index) {
+            indexGenerator.set(index);
+        }
+    }
 
     @Override
     public PulsarApi.BrokerEntryMetadata.Builder 
intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) {
-        return brokerMetadata.setBrokerTimestamp(System.currentTimeMillis());
+        // do nothing, just return brokerMetadata
+        return brokerMetadata;
+    }
+
+    @Override
+    public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(
+            PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
+            int batchSize) {
+        return brokerMetadata.setIndex(indexGenerator.addAndGet(batchSize));
+    }
+
+    public long getIndex() {
+        return indexGenerator.get();
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
index 42dfc8d..6dcb794 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
@@ -26,4 +26,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
  */
 public interface BrokerEntryMetadataInterceptor {
     PulsarApi.BrokerEntryMetadata.Builder 
intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata);
+    PulsarApi.BrokerEntryMetadata.Builder 
interceptWithBatchSize(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
+                                                                 int 
batchSize);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7ec9132..7dfb131 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
@@ -1962,6 +1963,37 @@ public class Commands {
         return compositeByteBuf;
     }
 
+    public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,
+                                                 
Set<BrokerEntryMetadataInterceptor> brokerInterceptors,
+                                                 int batchSize) {
+        //   | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE 
|         BROKER_ENTRY_METADATA         |
+        //   |         2 bytes                    |       4 bytes              
|    BROKER_ENTRY_METADATA_SIZE bytes   |
+
+        PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder = 
PulsarApi.BrokerEntryMetadata.newBuilder();
+        for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) {
+            interceptor.intercept(brokerMetadataBuilder);
+            interceptor.interceptWithBatchSize(brokerMetadataBuilder, 
batchSize);
+        }
+        PulsarApi.BrokerEntryMetadata brokerEntryMetadata = 
brokerMetadataBuilder.build();
+        int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
+        ByteBuf brokerMeta =
+                PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, 
brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        ByteBufCodedOutputStream outStream = 
ByteBufCodedOutputStream.get(brokerMeta);
+        try {
+            brokerEntryMetadata.writeTo(outStream);
+        } catch (IOException e) {
+            // This is in-memory serialization, should not fail
+            throw new RuntimeException(e);
+        }
+        outStream.recycle();
+
+        CompositeByteBuf compositeByteBuf = 
PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload);
+        return compositeByteBuf;
+    }
+
     public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf 
headerAndPayloadWithBrokerEntryMetadata) {
         int readerIndex = 
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
         if (headerAndPayloadWithBrokerEntryMetadata.readShort() == 
magicBrokerEntryMetadata) {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index b998828..ace18c8 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -183,6 +183,7 @@ message SingleMessageMetadata {
 // metadata added for entry from broker
 message BrokerEntryMetadata {
     optional uint64 broker_timestamp = 1;
+    optional uint64 index = 2;
 }
 
 enum ServerError {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
index df5bf76..2bd4df0 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
@@ -145,14 +145,19 @@ public class CommandUtilsTests {
 
     @Test
     public void testAddBrokerEntryMetadata() throws Exception {
+        int MOCK_BATCH_SIZE = 10;
         String data = "test-message";
         ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), 
data.length());
         byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
 
         PulsarApi.BrokerEntryMetadata brokerMetadata =
-                
PulsarApi.BrokerEntryMetadata.newBuilder().setBrokerTimestamp(System.currentTimeMillis()).build();
+                PulsarApi.BrokerEntryMetadata
+                        .newBuilder()
+                        .setBrokerTimestamp(System.currentTimeMillis())
+                        .setIndex(MOCK_BATCH_SIZE - 1)
+                        .build();
         ByteBuf dataWithBrokerEntryMetadata =
-                Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors());
+                Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
         assertEquals(brokerMetadata.getSerializedSize() + data.length() + 6,
                 dataWithBrokerEntryMetadata.readableBytes());
 
@@ -167,7 +172,7 @@ public class CommandUtilsTests {
         ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), 
data.length());
         byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
         ByteBuf dataWithBrokerEntryMetadata =
-                Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors());
+                Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors(), 11);
 
         Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
         assertEquals(data.length(), 
dataWithBrokerEntryMetadata.readableBytes());
@@ -179,15 +184,17 @@ public class CommandUtilsTests {
 
     @Test
     public void testParseBrokerEntryMetadata() throws Exception {
+        int MOCK_BATCH_SIZE = 10;
         String data = "test-message";
         ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), 
data.length());
         byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
         ByteBuf dataWithBrokerEntryMetadata =
-                Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors());
+                Commands.addBrokerEntryMetadata(byteBuf, 
getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
         PulsarApi.BrokerEntryMetadata brokerMetadata =
                 
Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
 
         assertTrue(brokerMetadata.getBrokerTimestamp() <= 
System.currentTimeMillis());
+        assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1);
         assertEquals(data.length(), 
dataWithBrokerEntryMetadata.readableBytes());
 
         byte [] content = new 
byte[dataWithBrokerEntryMetadata.readableBytes()];
@@ -198,6 +205,7 @@ public class CommandUtilsTests {
     public Set<BrokerEntryMetadataInterceptor> 
getBrokerEntryMetadataInterceptors() {
         Set<String> interceptorNames = new HashSet<>();
         
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
+        
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
         return 
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
                 Thread.currentThread().getContextClassLoader());
     }

Reply via email to