sijie commented on a change in pull request #8618:
URL: https://github.com/apache/pulsar/pull/8618#discussion_r540392649



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -103,8 +107,13 @@ public void setCloseWhenDone(boolean closeWhenDone) {
 
     public void initiate() {
         if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, 
State.INITIATED)) {
-            ByteBuf duplicateBuffer = data.retainedDuplicate();
 
+            ByteBuf duplicateBuffer = data.retainedDuplicate();
+            if (ml.getConfig().isBrokerEntryMetaEnabled()) {
+                duplicateBuffer = 
Commands.addBrokerEntryMetadata(duplicateBuffer,

Review comment:
       I am still not convinced why do we need to this here. ManagedLedger only 
handles serialized entry. The entry metadata should be appended at the broker 
level. I think the right place to add this logic should be done in 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L342.

##########
File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -865,6 +866,11 @@
                 "please enable the system topic first.")
     private boolean topicLevelPoliciesEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "List of interceptors for broker metadata.")

Review comment:
       ```suggestion
               doc = "List of interceptors for entry metadata.")
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
##########
@@ -68,16 +70,22 @@ public void expireMessages(int messageTTLInSeconds) {
                     messageTTLInSeconds);
 
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
-                MessageImpl<?> msg = null;
+                Pair<MessageImpl<byte[]>, PulsarApi.BrokerEntryMetadata> pair 
= null;
                 try {
-                    msg = MessageImpl.deserialize(entry.getDataBuffer());
-                    return msg.isExpired(messageTTLInSeconds);
+                    pair = 
MessageImpl.deserializeWithBrokerEntryMetaData(entry.getDataBuffer());

Review comment:
       We should expose the broker metadata in the Message. So this would avoid 
using `Pair` and a lot of `if-else` logic.
   
   We can improve `msg.isExpired` logic. If entry metadata is present, use 
broker timestamp; otherwise use client timestamp.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to