sijie commented on a change in pull request #8618:
URL: https://github.com/apache/pulsar/pull/8618#discussion_r540392649
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -103,8 +107,13 @@ public void setCloseWhenDone(boolean closeWhenDone) {
public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN,
State.INITIATED)) {
- ByteBuf duplicateBuffer = data.retainedDuplicate();
+ ByteBuf duplicateBuffer = data.retainedDuplicate();
+ if (ml.getConfig().isBrokerEntryMetaEnabled()) {
+ duplicateBuffer =
Commands.addBrokerEntryMetadata(duplicateBuffer,
Review comment:
I am still not convinced why do we need to this here. ManagedLedger only
handles serialized entry. The entry metadata should be appended at the broker
level. I think the right place to add this logic should be done in
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L342.
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -865,6 +866,11 @@
"please enable the system topic first.")
private boolean topicLevelPoliciesEnabled = false;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "List of interceptors for broker metadata.")
Review comment:
```suggestion
doc = "List of interceptors for entry metadata.")
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
##########
@@ -68,16 +70,22 @@ public void expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
entry -> {
- MessageImpl<?> msg = null;
+ Pair<MessageImpl<byte[]>, PulsarApi.BrokerEntryMetadata> pair
= null;
try {
- msg = MessageImpl.deserialize(entry.getDataBuffer());
- return msg.isExpired(messageTTLInSeconds);
+ pair =
MessageImpl.deserializeWithBrokerEntryMetaData(entry.getDataBuffer());
Review comment:
We should expose the broker metadata in the Message. So this would avoid
using `Pair` and a lot of `if-else` logic.
We can improve `msg.isExpired` logic. If entry metadata is present, use
broker timestamp; otherwise use client timestamp.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]