This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d85a5e2 [Feature] Introduce continuous offset for pulsar (#9039)
d85a5e2 is described below
commit d85a5e23c408eb3243c434909588324455aa94a5
Author: Aloys <[email protected]>
AuthorDate: Thu Dec 24 20:52:40 2020 +0800
[Feature] Introduce continuous offset for pulsar (#9039)
Fixes #9038
### Motivation
As described in
[PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata).
One of the use case for Broker entry metadata is providing continuous
message sequence-Id for messages in one topic-partition which is useful for
Protocol Hanlder like KOP.
This PR enable Pulsar to support continuous offset for message based on
Broker entry metadata.
### Modifications
Introduce a new field for broker entry metadta named `offset`;
Introduce a new interceptor type `ManagedLedgerInterceptor` which intercept
entry in `ManagedLedger`;
Each partition will be assigned a `ManagedLedgerInterceptor` when
`ManagedLedger` created;
Each Entry will be intercept for adding a monotone increasing offset in
Broker entry metadata and the offet is added by batchSize of entry;
Support find position by a given offset.
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 74 +++++++
.../bookkeeper/mledger/ManagedLedgerConfig.java | 9 +
.../bookkeeper/mledger/ManagedLedgerException.java | 6 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 150 +++++++++++++-
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 43 +++-
.../bookkeeper/mledger/impl/OpFindNewest.java | 28 ++-
.../interceptor/ManagedLedgerInterceptor.java | 60 ++++++
.../intercept/ManagedLedgerInterceptorImpl.java | 113 +++++++++++
.../pulsar/broker/service/BrokerService.java | 17 ++
.../org/apache/pulsar/broker/service/Producer.java | 5 +
.../org/apache/pulsar/broker/service/Topic.java | 4 +
.../broker/service/persistent/PersistentTopic.java | 26 +--
.../intercept/MangedLedgerInterceptorImplTest.java | 220 +++++++++++++++++++++
.../service/PersistentMessageFinderTest.java | 9 +-
.../apache/pulsar/common/api/proto/PulsarApi.java | 57 ++++++
.../AppendBrokerTimestampMetadataInterceptor.java | 8 +
...or.java => AppendIndexMetadataInterceptor.java} | 33 +++-
.../intercept/BrokerEntryMetadataInterceptor.java | 2 +
.../apache/pulsar/common/protocol/Commands.java | 32 +++
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
.../pulsar/common/protocol/CommandUtilsTests.java | 16 +-
21 files changed, 870 insertions(+), 43 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4b06b5e..6274f97 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -32,6 +32,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
/**
@@ -75,6 +76,18 @@ public interface ManagedLedger {
Position addEntry(byte[] data) throws InterruptedException,
ManagedLedgerException;
/**
+ * Append a new entry to the end of a managed ledger.
+ *
+ * @param data
+ * data entry to be persisted
+ * @param numberOfMessages
+ * numberOfMessages of entry
+ * @return the Position at which the entry has been inserted
+ * @throws ManagedLedgerException
+ */
+ Position addEntry(byte[] data, int numberOfMessages) throws
InterruptedException, ManagedLedgerException;
+
+ /**
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
@@ -103,6 +116,22 @@ public interface ManagedLedger {
Position addEntry(byte[] data, int offset, int length) throws
InterruptedException, ManagedLedgerException;
/**
+ * Append a new entry to the end of a managed ledger.
+ *
+ * @param data
+ * data entry to be persisted
+ * @param numberOfMessages
+ * numberOfMessages of entry
+ * @param offset
+ * offset in the data array
+ * @param length
+ * number of bytes
+ * @return the Position at which the entry has been inserted
+ * @throws ManagedLedgerException
+ */
+ Position addEntry(byte[] data, int numberOfMessages, int offset, int
length) throws InterruptedException, ManagedLedgerException;
+
+ /**
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
@@ -123,6 +152,26 @@ public interface ManagedLedger {
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
+ * @param data
+ * data entry to be persisted
+ * @param numberOfMessages
+ * numberOfMessages of entry
+ * @param offset
+ * offset in the data array
+ * @param length
+ * number of bytes
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int
length, AddEntryCallback callback, Object ctx);
+
+
+ /**
+ * Append a new entry asynchronously.
+ *
+ * @see #addEntry(byte[])
* @param buffer
* buffer with the data entry
* @param callback
@@ -133,6 +182,21 @@ public interface ManagedLedger {
void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx);
/**
+ * Append a new entry asynchronously.
+ *
+ * @see #addEntry(byte[])
+ * @param buffer
+ * buffer with the data entry
+ * @param numberOfMessages
+ * numberOfMessages for data entry
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ */
+ void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
callback, Object ctx);
+
+ /**
* Open a ManagedCursor in this ManagedLedger.
*
* <p/>If the cursors doesn't exist, a new one will be created and its
position will be at the end of the
@@ -520,4 +584,14 @@ public interface ManagedLedger {
* Roll current ledger if it is full
*/
void rollCurrentLedgerIfFull();
+
+ /**
+ * Find position by sequenceId.
+ * */
+ CompletableFuture<Position>
asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);
+
+ /**
+ * Get the ManagedLedgerInterceptor for ManagedLedger.
+ * */
+ ManagedLedgerInterceptor getManagedLedgerInterceptor();
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index b1f2512..7f982b1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -33,6 +33,7 @@ import
org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import
org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
/**
@@ -75,6 +76,7 @@ public class ManagedLedgerConfig {
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
+ private ManagedLedgerInterceptor managedLedgerInterceptor;
public boolean isCreateIfMissing() {
return createIfMissing;
@@ -637,4 +639,11 @@ public class ManagedLedgerConfig {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}
+ public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+ return managedLedgerInterceptor;
+ }
+
+ public void setManagedLedgerInterceptor(ManagedLedgerInterceptor
managedLedgerInterceptor) {
+ this.managedLedgerInterceptor = managedLedgerInterceptor;
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 14202cb..0f56b99 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -157,6 +157,12 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class ManagedLedgerInterceptException extends
ManagedLedgerException {
+ public ManagedLedgerInterceptException(String msg) {
+ super(msg);
+ }
+ }
+
@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ba50d2b..562a331 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -64,6 +64,8 @@ import
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+
+import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -77,6 +79,7 @@ import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -98,6 +101,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundExcept
import
org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -108,6 +112,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -195,6 +200,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
volatile PositionImpl lastConfirmedEntry;
+ private ManagedLedgerInterceptor managedLedgerInterceptor;
+
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
@@ -283,6 +290,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs()
* (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
+ if (config.getManagedLedgerInterceptor() != null) {
+ this.managedLedgerInterceptor =
config.getManagedLedgerInterceptor();
+ }
}
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback
callback, final Object ctx) {
@@ -310,6 +320,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
propertiesMap.put(property.getKey(),
property.getValue());
}
}
+ if (managedLedgerInterceptor != null) {
+
managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap);
+ }
// Last ledger stat may be zeroed, we must update it
if (ledgers.size() > 0) {
@@ -325,6 +338,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
.setEntries(lh.getLastAddConfirmed() +
1).setSize(lh.getLength())
.setTimestamp(clock.millis()).build();
ledgers.put(id, info);
+ if (managedLedgerInterceptor != null) {
+
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh);
+ }
initializeBookKeeper(callback);
} else if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger not found: {}", name,
ledgers.lastKey());
@@ -551,6 +567,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
@Override
+ public Position addEntry(byte[] data, int numberOfMessages) throws
InterruptedException, ManagedLedgerException {
+ return addEntry(data, numberOfMessages, 0, data.length);
+ }
+
+ @Override
public Position addEntry(byte[] data, int offset, int length) throws
InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
// Result list will contain the status exception and the resulting
@@ -586,6 +607,41 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
@Override
+ public Position addEntry(byte[] data, int numberOfMessages, int offset,
int length) throws InterruptedException, ManagedLedgerException {
+ final CountDownLatch counter = new CountDownLatch(1);
+ // Result list will contain the status exception and the resulting
+ // position
+ class Result {
+ ManagedLedgerException status = null;
+ Position position = null;
+ }
+ final Result result = new Result();
+
+ asyncAddEntry(data, numberOfMessages, offset, length, new
AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ result.position = position;
+ counter.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ result.status = exception;
+ counter.countDown();
+ }
+ }, null);
+
+ counter.await();
+
+ if (result.status != null) {
+ log.error("[{}] Error adding entry", name, result.status);
+ throw result.status;
+ }
+
+ return result.position;
+ }
+
+ @Override
public void asyncAddEntry(final byte[] data, final AddEntryCallback
callback, final Object ctx) {
asyncAddEntry(data, 0, data.length, callback, ctx);
}
@@ -598,6 +654,13 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
@Override
+ public void asyncAddEntry(final byte[] data, int numberOfMessages, int
offset, int length, final AddEntryCallback callback,
+ final Object ctx) {
+ ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length);
+ asyncAddEntry(buffer, numberOfMessages, callback, ctx);
+ }
+
+ @Override
public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback,
Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] asyncAddEntry size={} state={}", name,
buffer.readableBytes(), state);
@@ -609,6 +672,18 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
executor.executeOrdered(name, safeRun(() ->
internalAsyncAddEntry(addOperation)));
}
+ @Override
+ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages,
AddEntryCallback callback, Object ctx) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] asyncAddEntry size={} state={}", name,
buffer.readableBytes(), state);
+ }
+
+ OpAddEntry addOperation = OpAddEntry.create(this, buffer,
numberOfMessages, callback, ctx);
+
+ // Jump to specific thread to avoid contention from writers writing
from different threads
+ executor.executeOrdered(name, safeRun(() ->
internalAsyncAddEntry(addOperation)));
+ }
+
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
pendingAddEntries.add(addOperation);
final State state = STATE_UPDATER.get(this);
@@ -672,8 +747,27 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
addOperation.setCloseWhenDone(true);
STATE_UPDATER.set(this, State.ClosingLedger);
}
+ // interceptor entry before add to bookie
+ if (beforeAddEntry(addOperation)) {
+ addOperation.initiate();
+ }
+ }
+ }
- addOperation.initiate();
+ private boolean beforeAddEntry(OpAddEntry addOperation) {
+ // if no interceptor, just return true to make sure addOperation will
be initiate()
+ if (managedLedgerInterceptor == null) {
+ return true;
+ }
+ try {
+ managedLedgerInterceptor.beforeAddEntry(addOperation,
addOperation.getNumberOfMessages());
+ return true;
+ } catch (Exception e) {
+ addOperation.failed(
+ new ManagedLedgerInterceptException("Interceptor managed
ledger before add to bookie failed."));
+ ReferenceCountUtil.release(addOperation.data);
+ log.error("[{}] Failed to intercept adding an entry to bookie.",
name, e);
+ return false;
}
}
@@ -1357,10 +1451,12 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
// If op is used by another ledger handle, we need to close it
and create a new one
if (existsOp.ledger != null) {
existsOp.close();
- existsOp = OpAddEntry.create(existsOp.ml, existsOp.data,
existsOp.callback, existsOp.ctx);
+ existsOp = OpAddEntry.create(existsOp.ml, existsOp.data,
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
- pendingAddEntries.add(existsOp);
+ if (beforeAddEntry(existsOp)) {
+ pendingAddEntries.add(existsOp);
+ }
}
} while (existsOp != null && --pendingSize > 0);
@@ -1470,6 +1566,51 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ @Override
+ public CompletableFuture<Position>
asyncFindPosition(com.google.common.base.Predicate<Entry> predicate) {
+
+ CompletableFuture<Position> future = new CompletableFuture();
+ Long firstLedgerId = ledgers.firstKey();
+ final PositionImpl startPosition = firstLedgerId == null ? null : new
PositionImpl(firstLedgerId, 0);
+ if (startPosition == null) {
+ future.complete(null);
+ return future;
+ }
+ AsyncCallbacks.FindEntryCallback findEntryCallback = new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ final Position finalPosition;
+ if (position == null) {
+ finalPosition = startPosition;
+ if (finalPosition == null) {
+ log.warn("[{}] Unable to find position for predicate
{}.", name, predicate);
+ future.complete(null);
+ return;
+ }
+ log.info("[{}] Unable to find position for predicate {}.
Use the first position {} instead.", name, predicate, startPosition);
+ } else {
+ finalPosition = getNextValidPosition((PositionImpl)
position);
+ }
+ future.complete((PositionImpl) finalPosition);
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
+ log.warn("[{}] Unable to find position for predicate {}.",
name, predicate);
+ future.complete(null);
+ }
+ };
+ long max = getNumberOfEntries() - 1;
+ OpFindNewest op = new OpFindNewest(this, startPosition, predicate,
max, findEntryCallback, null);
+ op.find();
+ return future;
+ }
+
+ @Override
+ public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+ return managedLedgerInterceptor;
+ }
+
void clearPendingAddEntries(ManagedLedgerException e) {
while (!pendingAddEntries.isEmpty()) {
OpAddEntry op = pendingAddEntries.poll();
@@ -3085,6 +3226,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
.setEntryId(lastConfirmedEntry.getEntryId()));
}
+ if (managedLedgerInterceptor != null) {
+ managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap);
+ }
for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
.setKey(property.getKey()).setValue(property.getValue()));
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 08d188f..fa5228c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -44,10 +44,11 @@ import org.slf4j.LoggerFactory;
* Handles the life-cycle of an addEntry() operation.
*
*/
-class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
+public class OpAddEntry extends SafeRunnable implements AddCallback,
CloseCallback {
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
private long entryId;
+ private int numberOfMessages;
@SuppressWarnings("unused")
private static final AtomicReferenceFieldUpdater<OpAddEntry,
AddEntryCallback> callbackUpdater =
@@ -95,6 +96,27 @@ class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallback {
return op;
}
+ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int
numberOfMessages, AddEntryCallback callback, Object ctx) {
+ OpAddEntry op = RECYCLER.get();
+ op.ml = ml;
+ op.ledger = null;
+ op.numberOfMessages = numberOfMessages;
+ op.data = data.retain();
+ op.dataLength = data.readableBytes();
+ op.callback = callback;
+ op.ctx = ctx;
+ op.addOpCount =
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
+ op.closeWhenDone = false;
+ op.entryId = -1;
+ op.startTime = System.nanoTime();
+ op.state = State.OPEN;
+ ml.mbean.addAddEntrySample(op.dataLength);
+ if (log.isDebugEnabled()) {
+ log.debug("Created new OpAddEntry {}", op);
+ }
+ return op;
+ }
+
public void setLedger(LedgerHandle ledger) {
this.ledger = ledger;
}
@@ -272,7 +294,23 @@ class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallback {
public State getState() {
return state;
}
-
+
+ public ByteBuf getData() {
+ return data;
+ }
+
+ public int getNumberOfMessages() {
+ return numberOfMessages;
+ }
+
+ public void setNumberOfMessages(int numberOfMessages) {
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ public void setData(ByteBuf data) {
+ this.data = data;
+ }
+
private final Handle<OpAddEntry> recyclerHandle;
private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
@@ -290,6 +328,7 @@ class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallback {
ml = null;
ledger = null;
data = null;
+ numberOfMessages = 0;
dataLength = -1;
callback = null;
ctx = null;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 861d247..cbecedd 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -31,6 +31,7 @@ import
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
class OpFindNewest implements ReadEntryCallback {
private final ManagedCursorImpl cursor;
+ private final ManagedLedgerImpl ledger;
private final PositionImpl startPosition;
private final FindEntryCallback callback;
private final Predicate<Entry> condition;
@@ -49,6 +50,23 @@ class OpFindNewest implements ReadEntryCallback {
public OpFindNewest(ManagedCursorImpl cursor, PositionImpl startPosition,
Predicate<Entry> condition,
long numberOfEntries, FindEntryCallback callback, Object ctx) {
this.cursor = cursor;
+ this.ledger = cursor.ledger;
+ this.startPosition = startPosition;
+ this.callback = callback;
+ this.condition = condition;
+ this.ctx = ctx;
+
+ this.min = 0;
+ this.max = numberOfEntries;
+
+ this.searchPosition = startPosition;
+ this.state = State.checkFirst;
+ }
+
+ public OpFindNewest(ManagedLedgerImpl ledger, PositionImpl startPosition,
Predicate<Entry> condition,
+ long numberOfEntries, FindEntryCallback callback,
Object ctx) {
+ this.cursor = null;
+ this.ledger = ledger;
this.startPosition = startPosition;
this.callback = callback;
this.condition = condition;
@@ -77,7 +95,7 @@ class OpFindNewest implements ReadEntryCallback {
// check last entry
state = State.checkLast;
- searchPosition =
cursor.ledger.getPositionAfterN(searchPosition, max,
PositionBound.startExcluded);
+ searchPosition = ledger.getPositionAfterN(searchPosition, max,
PositionBound.startExcluded);
find();
}
break;
@@ -88,7 +106,7 @@ class OpFindNewest implements ReadEntryCallback {
} else {
// start binary search
state = State.searching;
- searchPosition =
cursor.ledger.getPositionAfterN(startPosition, mid(),
PositionBound.startExcluded);
+ searchPosition = ledger.getPositionAfterN(startPosition,
mid(), PositionBound.startExcluded);
find();
}
break;
@@ -106,7 +124,7 @@ class OpFindNewest implements ReadEntryCallback {
callback.findEntryComplete(lastMatchedPosition,
OpFindNewest.this.ctx);
return;
}
- searchPosition = cursor.ledger.getPositionAfterN(startPosition,
mid(), PositionBound.startExcluded);
+ searchPosition = ledger.getPositionAfterN(startPosition, mid(),
PositionBound.startExcluded);
find();
}
}
@@ -117,8 +135,8 @@ class OpFindNewest implements ReadEntryCallback {
}
public void find() {
- if (cursor.hasMoreEntries(searchPosition)) {
- cursor.ledger.asyncReadEntry(searchPosition, this, null);
+ if (cursor != null ? cursor.hasMoreEntries(searchPosition) :
ledger.hasMoreEntries(searchPosition)) {
+ ledger.asyncReadEntry(searchPosition, this, null);
} else {
callback.findEntryComplete(lastMatchedPosition,
OpFindNewest.this.ctx);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java
new file mode 100644
index 0000000..f5d9f12
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.interceptor;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+
+import java.util.Map;
+
+/**
+ * Interceptor for ManagedLedger.
+ * */
[email protected]
[email protected]
+public interface ManagedLedgerInterceptor {
+
+ /**
+ * Intercept an OpAddEntry and return an OpAddEntry.
+ * @param op an OpAddEntry to be intercepted.
+ * @param batchSize
+ * @return an OpAddEntry.
+ */
+ OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize);
+
+ /**
+ * Intercept when ManagedLedger is initialized.
+ * @param propertiesMap map of properties.
+ */
+ void onManagedLedgerPropertiesInitialize(Map<String, String>
propertiesMap);
+
+ /**
+ * Intercept when ManagedLedger is initialized.
+ * @param name name of ManagedLedger
+ * @param ledgerHandle a LedgerHandle.
+ */
+ void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle
ledgerHandle);
+
+ /**
+ * @param propertiesMap map of properties.
+ */
+ void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
new file mode 100644
index 0000000..0e59d2f
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
+import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
+import org.apache.pulsar.common.protocol.Commands;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
+ private static final Logger log =
LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
+ private static final String INDEX = "index";
+
+
+ private final Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors;
+
+
+ public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors) {
+ this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
+ }
+
+ public long getIndex() {
+ long index = -1;
+ for (BrokerEntryMetadataInterceptor interceptor :
brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ index = ((AppendIndexMetadataInterceptor)
interceptor).getIndex();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) {
+ if (op == null || batchSize <= 0) {
+ return op;
+ }
+ op.setData(Commands.addBrokerEntryMetadata(op.getData(),
brokerEntryMetadataInterceptors, batchSize));
+ return op;
+ }
+
+ @Override
+ public void onManagedLedgerPropertiesInitialize(Map<String, String>
propertiesMap) {
+ if (propertiesMap == null || propertiesMap.size() == 0) {
+ return;
+ }
+
+ if (propertiesMap.containsKey(INDEX)) {
+ for (BrokerEntryMetadataInterceptor interceptor :
brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ ((AppendIndexMetadataInterceptor) interceptor)
+
.recoveryIndexGenerator(Long.parseLong(propertiesMap.get(INDEX)));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle
lh) {
+ try {
+ for (BrokerEntryMetadataInterceptor interceptor :
brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ LedgerEntries ledgerEntries =
+ lh.read(lh.getLastAddConfirmed() - 1,
lh.getLastAddConfirmed());
+ for (LedgerEntry entry : ledgerEntries) {
+ PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
+
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
+ if (brokerEntryMetadata != null &&
brokerEntryMetadata.hasIndex()) {
+ ((AppendIndexMetadataInterceptor) interceptor)
+
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+ }
+ }
+
+ }
+ }
+ } catch (org.apache.bookkeeper.client.api.BKException |
InterruptedException e) {
+ log.error("[{}] Read last entry error.", name, e);
+ }
+ }
+
+ @Override
+ public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
+ for (BrokerEntryMetadataInterceptor interceptor :
brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor) {
+ propertiesMap.put(INDEX,
String.valueOf(((AppendIndexMetadataInterceptor) interceptor).getIndex()));
+ }
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6cfdf38..792c676 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
@@ -98,6 +99,7 @@ import
org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
@@ -126,6 +128,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.FieldContext;
+import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -1092,6 +1095,20 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> {
+ if (isBrokerEntryMetadataEnabled()) {
+ // init managedLedger interceptor
+ for (BrokerEntryMetadataInterceptor interceptor :
brokerEntryMetadataInterceptors) {
+ if (interceptor instanceof AppendIndexMetadataInterceptor)
{
+ // add individual AppendOffsetMetadataInterceptor for
each topic
+ brokerEntryMetadataInterceptors.remove(interceptor);
+ brokerEntryMetadataInterceptors.add(new
AppendIndexMetadataInterceptor());
+ }
+ }
+ ManagedLedgerInterceptor mlInterceptor =
+ new
ManagedLedgerInterceptorImpl(brokerEntryMetadataInterceptors);
+ managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor);
+ }
+
managedLedgerConfig.setCreateIfMissing(createIfMissing);
// Once we have the configuration, we can proceed with the async
open operation
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index c69f9ba..2bca232 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -463,6 +463,11 @@ public class Producer {
return callback;
}
+ @Override
+ public long getNumberOfMessages() {
+ return batchSize;
+ }
+
private final Handle<MessagePublishContext> recyclerHandle;
private MessagePublishContext(Handle<MessagePublishContext>
recyclerHandle) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index af88ce8..c940195 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -90,6 +90,10 @@ public interface Topic {
default long getOriginalHighestSequenceId() {
return -1L;
}
+
+ default long getNumberOfMessages() {
+ return 1L;
+ }
}
void publishMessage(ByteBuf headersAndPayload, PublishContext callback);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 676f98c..49e9042 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -357,10 +357,7 @@ public class PersistentTopic extends AbstractTopic
messageDeduplication.isDuplicate(publishContext,
headersAndPayload);
switch (status) {
case NotDup:
- // intercept headersAndPayload and add entry metadata
- if (appendBrokerEntryMetadata(headersAndPayload,
publishContext)) {
- ledger.asyncAddEntry(headersAndPayload, this,
publishContext);
- }
+ asyncAddEntry(headersAndPayload, publishContext);
break;
case Dup:
// Immediately acknowledge duplicated message
@@ -374,22 +371,13 @@ public class PersistentTopic extends AbstractTopic
}
}
- private boolean appendBrokerEntryMetadata(ByteBuf headersAndPayload,
PublishContext publishContext) {
- // just return true if BrokerEntryMetadata is not enabled
- if (!brokerService.isBrokerEntryMetadataEnabled()) {
- return true;
- }
-
- try {
- headersAndPayload =
Commands.addBrokerEntryMetadata(headersAndPayload,
- brokerService.getBrokerEntryMetadataInterceptors());
- } catch (Exception e) {
- decrementPendingWriteOpsAndCheck();
- publishContext.completed(new
BrokerServiceException.AddEntryMetadataException(e), -1, -1);
- log.error("[{}] Failed to add broker entry metadata.", topic, e);
- return false;
+ private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext
publishContext) {
+ if (brokerService.isBrokerEntryMetadataEnabled()) {
+ ledger.asyncAddEntry(headersAndPayload,
+ (int) publishContext.getNumberOfMessages(), this,
publishContext);
+ } else {
+ ledger.asyncAddEntry(headersAndPayload, this, publishContext);
}
- return true;
}
public void asyncReadEntry(PositionImpl position,
AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
new file mode 100644
index 0000000..2c86b98
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
+import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
+import org.apache.pulsar.common.protocol.Commands;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static
com.sun.org.apache.xml.internal.serialize.OutputFormat.Defaults.Encoding;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase
{
+ private static final Logger log =
LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
+
+
+ @Test
+ public void testAddBrokerEntryMetadata() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ int numberOfEntries = 10;
+ final String ledgerAndCursorName = "topicEntryMetadataSequenceId";
+
+ ManagedLedgerInterceptor interceptor = new
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setManagedLedgerInterceptor(interceptor);
+
+ ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+
+ for ( int i = 0 ; i < numberOfEntries; i ++) {
+ ledger.addEntry(("message" + i).getBytes(), MOCK_BATCH_SIZE);
+ }
+
+
+ assertEquals(19, ((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex());
+ List<Entry> entryList = cursor.readEntries(numberOfEntries);
+ for (int i = 0 ; i < numberOfEntries; i ++) {
+ PulsarApi.BrokerEntryMetadata metadata =
+
Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer());
+ assertNotNull(metadata);
+ assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1);
+ }
+
+ cursor.close();;
+ ledger.close();
+ factory.shutdown();
+ }
+
+
+ @Test(timeOut = 20000)
+ public void testRecoveryIndex() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ ManagedLedgerInterceptor interceptor = new
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setManagedLedgerInterceptor(interceptor);
+ ManagedLedger ledger = factory.open("my_recovery_index_test_ledger",
config);
+
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding), MOCK_BATCH_SIZE);
+
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ ledger.addEntry("dummy-entry-2".getBytes(Encoding), MOCK_BATCH_SIZE);
+
+ assertEquals(((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1);
+
+ ledger.close();
+
+ log.info("Closing ledger and reopening");
+
+ // / Reopen the same managed-ledger
+ ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc,
bkc.getZkHandle());
+ ledger = factory2.open("my_recovery_index_test_ledger", config);
+
+ cursor = ledger.openCursor("c1");
+
+ assertEquals(ledger.getNumberOfEntries(), 2);
+ assertEquals(((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1);
+
+
+ List<Entry> entries = cursor.readEntries(100);
+ assertEquals(entries.size(), 1);
+ entries.forEach(e -> e.release());
+
+ cursor.close();
+ ledger.close();
+ factory2.shutdown();
+ }
+
+ @Test
+ public void testFindPositionByIndex() throws Exception {
+ final int MOCK_BATCH_SIZE = 2;
+ final int maxEntriesPerLedger = 5;
+ int maxSequenceIdPerLedger = MOCK_BATCH_SIZE * maxEntriesPerLedger;
+ ManagedLedgerInterceptor interceptor = new
ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors());
+
+
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setManagedLedgerInterceptor(interceptor);
+ managedLedgerConfig.setMaxEntriesPerLedger(5);
+
+ ManagedLedger ledger =
factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig);
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ long firstLedgerId = -1;
+ for (int i = 0; i < maxEntriesPerLedger; i++) {
+ firstLedgerId = ((PositionImpl)
ledger.addEntry("dummy-entry".getBytes(Encoding),
MOCK_BATCH_SIZE)).getLedgerId();
+ }
+
+ assertEquals(((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(), 9);
+
+
+ PositionImpl position = null;
+ for (int index = 0; index <= ((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(); index ++) {
+ position = (PositionImpl) ledger.asyncFindPosition(new
IndexSearchPredicate(index)).get();
+ assertEquals(position.getEntryId(), (index %
maxSequenceIdPerLedger) / MOCK_BATCH_SIZE);
+ }
+
+ // roll over ledger
+ long secondLedgerId = -1;
+ for (int i = 0; i < maxEntriesPerLedger; i++) {
+ secondLedgerId = ((PositionImpl)
ledger.addEntry("dummy-entry".getBytes(Encoding),
MOCK_BATCH_SIZE)).getLedgerId();
+ }
+ assertEquals(((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(), 19);
+ assertNotEquals(firstLedgerId, secondLedgerId);
+
+ for (int index = 0; index <= ((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(); index ++) {
+ position = (PositionImpl) ledger.asyncFindPosition(new
IndexSearchPredicate(index)).get();
+ assertEquals(position.getEntryId(), (index %
maxSequenceIdPerLedger) / MOCK_BATCH_SIZE);
+ }
+
+ // reopen ledger
+ ledger.close();
+ // / Reopen the same managed-ledger
+ ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc,
bkc.getZkHandle());
+ ledger = factory2.open("my_ml_broker_entry_metadata_test_ledger",
managedLedgerConfig);
+
+ long thirdLedgerId = -1;
+ for (int i = 0; i < maxEntriesPerLedger; i++) {
+ thirdLedgerId = ((PositionImpl)
ledger.addEntry("dummy-entry".getBytes(Encoding),
MOCK_BATCH_SIZE)).getLedgerId();
+ }
+ assertEquals(((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(), 29);
+ assertNotEquals(secondLedgerId, thirdLedgerId);
+
+ for (int index = 0; index <= ((ManagedLedgerInterceptorImpl)
ledger.getManagedLedgerInterceptor()).getIndex(); index ++) {
+ position = (PositionImpl) ledger.asyncFindPosition(new
IndexSearchPredicate(index)).get();
+ assertEquals(position.getEntryId(), (index %
maxSequenceIdPerLedger) / MOCK_BATCH_SIZE);
+ }
+ cursor.close();
+ ledger.close();
+ factory2.shutdown();
+ }
+
+ public static Set<BrokerEntryMetadataInterceptor>
getBrokerEntryMetadataInterceptors() {
+ Set<String> interceptorNames = new HashSet<>();
+
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
+
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
+ return
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
+ Thread.currentThread().getContextClassLoader());
+ }
+
+ class IndexSearchPredicate implements
com.google.common.base.Predicate<Entry> {
+
+ long indexToSearch = -1;
+ public IndexSearchPredicate(long indexToSearch) {
+ this.indexToSearch = indexToSearch;
+ }
+
+ @Override
+ public boolean apply(@Nullable Entry entry) {
+ try {
+ PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
+ return brokerEntryMetadata.getIndex() < indexToSearch;
+ } catch (Exception e) {
+ log.error("Error deserialize message for message position
find", e);
+ } finally {
+ entry.release();
+ }
+ return false;
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 1740802..c0c78d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -108,7 +108,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads)
throws Exception {
ByteBuf msgWithEntryMeta =
- Commands.addBrokerEntryMetadata(headerAndPayloads,
getBrokerEntryMetadataInterceptors());
+ Commands.addBrokerEntryMetadata(headerAndPayloads,
getBrokerEntryMetadataInterceptors(), 1);
byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
msgWithEntryMeta.release();
return byteMessage;
@@ -321,9 +321,10 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
public static Set<BrokerEntryMetadataInterceptor>
getBrokerEntryMetadataInterceptors() {
-
- return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(
-
Sets.newHashSet("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"),
+ Set<String> interceptorNames = new HashSet<>();
+
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
+
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
+ return
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
Thread.currentThread().getContextClassLoader());
}
/**
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index aa9f2c1..f19ba63 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -7074,6 +7074,10 @@ public final class PulsarApi {
// optional uint64 broker_timestamp = 1;
boolean hasBrokerTimestamp();
long getBrokerTimestamp();
+
+ // optional uint64 index = 2;
+ boolean hasIndex();
+ long getIndex();
}
public static final class BrokerEntryMetadata extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -7120,8 +7124,19 @@ public final class PulsarApi {
return brokerTimestamp_;
}
+ // optional uint64 index = 2;
+ public static final int INDEX_FIELD_NUMBER = 2;
+ private long index_;
+ public boolean hasIndex() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public long getIndex() {
+ return index_;
+ }
+
private void initFields() {
brokerTimestamp_ = 0L;
+ index_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7143,6 +7158,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeUInt64(1, brokerTimestamp_);
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeUInt64(2, index_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -7155,6 +7173,10 @@ public final class PulsarApi {
size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeUInt64Size(1, brokerTimestamp_);
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeUInt64Size(2, index_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -7270,6 +7292,8 @@ public final class PulsarApi {
super.clear();
brokerTimestamp_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
+ index_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@@ -7307,6 +7331,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000001;
}
result.brokerTimestamp_ = brokerTimestamp_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.index_ = index_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -7316,6 +7344,9 @@ public final class PulsarApi {
if (other.hasBrokerTimestamp()) {
setBrokerTimestamp(other.getBrokerTimestamp());
}
+ if (other.hasIndex()) {
+ setIndex(other.getIndex());
+ }
return this;
}
@@ -7350,6 +7381,11 @@ public final class PulsarApi {
brokerTimestamp_ = input.readUInt64();
break;
}
+ case 16: {
+ bitField0_ |= 0x00000002;
+ index_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -7377,6 +7413,27 @@ public final class PulsarApi {
return this;
}
+ // optional uint64 index = 2;
+ private long index_ ;
+ public boolean hasIndex() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public long getIndex() {
+ return index_;
+ }
+ public Builder setIndex(long value) {
+ bitField0_ |= 0x00000002;
+ index_ = value;
+
+ return this;
+ }
+ public Builder clearIndex() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ index_ = 0L;
+
+ return this;
+ }
+
//
@@protoc_insertion_point(builder_scope:pulsar.proto.BrokerEntryMetadata)
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
index 6043f26..78cdfc8 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
@@ -30,4 +30,12 @@ public class AppendBrokerTimestampMetadataInterceptor
implements BrokerEntryMeta
public PulsarApi.BrokerEntryMetadata.Builder
intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) {
return brokerMetadata.setBrokerTimestamp(System.currentTimeMillis());
}
+
+ @Override
+ public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(
+ PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
+ int batchSize) {
+ // do nothing, just return brokerMetadata
+ return brokerMetadata;
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
similarity index 54%
copy from
pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
index 6043f26..dba3d9a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java
@@ -20,14 +20,35 @@ package org.apache.pulsar.common.intercept;
import org.apache.pulsar.common.api.proto.PulsarApi;
-/**
- * A plugin interface that allows you to intercept the client requests to
- * the Pulsar brokers and add timestamp from broker side metadata for each
entry.
- */
-public class AppendBrokerTimestampMetadataInterceptor implements
BrokerEntryMetadataInterceptor {
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AppendIndexMetadataInterceptor implements
BrokerEntryMetadataInterceptor{
+ private final AtomicLong indexGenerator;
+
+ public AppendIndexMetadataInterceptor() {
+ this.indexGenerator = new AtomicLong(-1);
+ }
+
+ public void recoveryIndexGenerator(long index) {
+ if (indexGenerator.get() < index) {
+ indexGenerator.set(index);
+ }
+ }
@Override
public PulsarApi.BrokerEntryMetadata.Builder
intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) {
- return brokerMetadata.setBrokerTimestamp(System.currentTimeMillis());
+ // do nothing, just return brokerMetadata
+ return brokerMetadata;
+ }
+
+ @Override
+ public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(
+ PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
+ int batchSize) {
+ return brokerMetadata.setIndex(indexGenerator.addAndGet(batchSize));
+ }
+
+ public long getIndex() {
+ return indexGenerator.get();
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
index 42dfc8d..6dcb794 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java
@@ -26,4 +26,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
*/
public interface BrokerEntryMetadataInterceptor {
PulsarApi.BrokerEntryMetadata.Builder
intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata);
+ PulsarApi.BrokerEntryMetadata.Builder
interceptWithBatchSize(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata,
+ int
batchSize);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7ec9132..7dfb131 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@@ -1962,6 +1963,37 @@ public class Commands {
return compositeByteBuf;
}
+ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,
+
Set<BrokerEntryMetadataInterceptor> brokerInterceptors,
+ int batchSize) {
+ // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE
| BROKER_ENTRY_METADATA |
+ // | 2 bytes | 4 bytes
| BROKER_ENTRY_METADATA_SIZE bytes |
+
+ PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder =
PulsarApi.BrokerEntryMetadata.newBuilder();
+ for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) {
+ interceptor.intercept(brokerMetadataBuilder);
+ interceptor.interceptWithBatchSize(brokerMetadataBuilder,
batchSize);
+ }
+ PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
brokerMetadataBuilder.build();
+ int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
+ ByteBuf brokerMeta =
+ PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6,
brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ ByteBufCodedOutputStream outStream =
ByteBufCodedOutputStream.get(brokerMeta);
+ try {
+ brokerEntryMetadata.writeTo(outStream);
+ } catch (IOException e) {
+ // This is in-memory serialization, should not fail
+ throw new RuntimeException(e);
+ }
+ outStream.recycle();
+
+ CompositeByteBuf compositeByteBuf =
PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload);
+ return compositeByteBuf;
+ }
+
public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf
headerAndPayloadWithBrokerEntryMetadata) {
int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
if (headerAndPayloadWithBrokerEntryMetadata.readShort() ==
magicBrokerEntryMetadata) {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index b998828..ace18c8 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -183,6 +183,7 @@ message SingleMessageMetadata {
// metadata added for entry from broker
message BrokerEntryMetadata {
optional uint64 broker_timestamp = 1;
+ optional uint64 index = 2;
}
enum ServerError {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
index df5bf76..2bd4df0 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java
@@ -145,14 +145,19 @@ public class CommandUtilsTests {
@Test
public void testAddBrokerEntryMetadata() throws Exception {
+ int MOCK_BATCH_SIZE = 10;
String data = "test-message";
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(),
data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
PulsarApi.BrokerEntryMetadata brokerMetadata =
-
PulsarApi.BrokerEntryMetadata.newBuilder().setBrokerTimestamp(System.currentTimeMillis()).build();
+ PulsarApi.BrokerEntryMetadata
+ .newBuilder()
+ .setBrokerTimestamp(System.currentTimeMillis())
+ .setIndex(MOCK_BATCH_SIZE - 1)
+ .build();
ByteBuf dataWithBrokerEntryMetadata =
- Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors());
+ Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
assertEquals(brokerMetadata.getSerializedSize() + data.length() + 6,
dataWithBrokerEntryMetadata.readableBytes());
@@ -167,7 +172,7 @@ public class CommandUtilsTests {
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(),
data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
ByteBuf dataWithBrokerEntryMetadata =
- Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors());
+ Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors(), 11);
Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
assertEquals(data.length(),
dataWithBrokerEntryMetadata.readableBytes());
@@ -179,15 +184,17 @@ public class CommandUtilsTests {
@Test
public void testParseBrokerEntryMetadata() throws Exception {
+ int MOCK_BATCH_SIZE = 10;
String data = "test-message";
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(),
data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
ByteBuf dataWithBrokerEntryMetadata =
- Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors());
+ Commands.addBrokerEntryMetadata(byteBuf,
getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
PulsarApi.BrokerEntryMetadata brokerMetadata =
Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
assertTrue(brokerMetadata.getBrokerTimestamp() <=
System.currentTimeMillis());
+ assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1);
assertEquals(data.length(),
dataWithBrokerEntryMetadata.readableBytes());
byte [] content = new
byte[dataWithBrokerEntryMetadata.readableBytes()];
@@ -198,6 +205,7 @@ public class CommandUtilsTests {
public Set<BrokerEntryMetadataInterceptor>
getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
+
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
return
BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
Thread.currentThread().getContextClassLoader());
}