codelipenghui commented on code in PR #21940:
URL: https://github.com/apache/pulsar/pull/21940#discussion_r1461500826


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -100,6 +108,33 @@ public boolean expireMessages(int messageTTLInSeconds) {
         }
     }
 
+    private long getEntryTimestamp(Entry entry,
+                                  Map<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerIdToInfoCache,
+                                  ManagedCursor cursor) throws IOException {
+        long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+        ManagedLedger managedLedger = cursor.getManagedLedger();
+        ManagedLedgerConfig config = managedLedger.getConfig();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgerIdToInfoCache.computeIfAbsent(
+                entry.getLedgerId(), (ledgerId) -> {
+                    try {
+                        return managedLedger.getLedgerInfo(ledgerId)
+                                
.get(config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);

Review Comment:
   The method `managedLedger.getOptionalLedgerInfo()` can be used to avoid the 
complexity of using the async API.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -100,6 +108,33 @@ public boolean expireMessages(int messageTTLInSeconds) {
         }
     }
 
+    private long getEntryTimestamp(Entry entry,
+                                  Map<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerIdToInfoCache,
+                                  ManagedCursor cursor) throws IOException {
+        long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+        ManagedLedger managedLedger = cursor.getManagedLedger();
+        ManagedLedgerConfig config = managedLedger.getConfig();
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = 
ledgerIdToInfoCache.computeIfAbsent(
+                entry.getLedgerId(), (ledgerId) -> {
+                    try {
+                        return managedLedger.getLedgerInfo(ledgerId)
+                                
.get(config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+                    } catch (Exception e) {
+                        return null;
+                    }
+                });
+        if (ledgerInfo == null) {
+            return entryTimestamp;
+        }
+        long maxAvailableTimestamp = ledgerInfo.getTimestamp() + 
config.getMaximumRolloverTimeMs();

Review Comment:
   The `ledgerInfo.getTimestamp()` is the ledger closure time. Do we need to 
consider the maximum rollover time?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -78,10 +86,10 @@ public boolean expireMessages(int messageTTLInSeconds) {
         if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
             log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
                     messageTTLInSeconds);
-
+            Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgerIdToInfoCache = new ConcurrentHashMap<>();
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
                 try {
-                    long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                    long entryTimestamp = getEntryTimestamp(entry, 
ledgerIdToInfoCache, cursor);

Review Comment:
   Is it better to split the message expiration into 2 steps?
   
   1. Expire the closed ledgers if `ledger close time < current time - ttl`
   2. Expire the entries (the existing method)
   
   So that we don't need to touch the existing expire entries logic and this 
way will be more efficient since we don't need to read entries if the whole 
ledger is expired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to