lhotari commented on code in PR #25010:
URL: https://github.com/apache/pulsar/pull/25010#discussion_r2707625425


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4833,13 +4795,24 @@ public Optional<TopicName> getShadowSourceTopic() {
     protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {

Review Comment:
   This method has a side-effect now, it's no longer a pure function for 
determining whether maximum delivery delay has exceeded.
   
   The code using this method would be misleading after adding the TTL stats 
logic to this method:
   
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L645-L652
   
   Keeping the new TTL stats together with the maximum delivery delay check 
does help improve performance since it's possible to eliminate parsing the 
message metadata multiple times. However, I think that a better solution would 
be to keep these separate and move the message metadata parsing to be lazily 
performed in `org.apache.pulsar.broker.service.Topic.PublishContext` 
implementations.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4833,13 +4795,24 @@ public Optional<TopicName> getShadowSourceTopic() {
     protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
         if (isDelayedDeliveryEnabled()) {
             long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
-            if (maxDeliveryDelayInMs > 0) {
-                headersAndPayload.markReaderIndex();
-                MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
-                headersAndPayload.resetReaderIndex();
-                return msgMetadata.hasDeliverAtTime()
-                        && msgMetadata.getDeliverAtTime() - 
msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
+            if (maxDeliveryDelayInMs <= 0) {
+                return false;
+            }
+            headersAndPayload.markReaderIndex();
+            MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+            headersAndPayload.resetReaderIndex();
+            if (!msgMetadata.hasDeliverAtTime()) {
+                return false;
             }
+            long deliverAtTime = msgMetadata.getDeliverAtTime();
+            // count exceed ttl delayed messages
+            Integer messageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
+            if (messageTTLInSeconds != null && messageTTLInSeconds > 0

Review Comment:
   Moving the TTL stats to a new method would be preferred as mentioned in a 
previous comment.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4833,13 +4795,24 @@ public Optional<TopicName> getShadowSourceTopic() {
     protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
         if (isDelayedDeliveryEnabled()) {
             long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
-            if (maxDeliveryDelayInMs > 0) {
-                headersAndPayload.markReaderIndex();
-                MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
-                headersAndPayload.resetReaderIndex();
-                return msgMetadata.hasDeliverAtTime()
-                        && msgMetadata.getDeliverAtTime() - 
msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
+            if (maxDeliveryDelayInMs <= 0) {
+                return false;
+            }
+            headersAndPayload.markReaderIndex();
+            MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+            headersAndPayload.resetReaderIndex();
+            if (!msgMetadata.hasDeliverAtTime()) {
+                return false;
             }
+            long deliverAtTime = msgMetadata.getDeliverAtTime();
+            // count exceed ttl delayed messages
+            Integer messageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
+            if (messageTTLInSeconds != null && messageTTLInSeconds > 0

Review Comment:
   there should be a check before parsing the message metadata to skip the 
check completely if message TTL is not set at all. The reason for performing 
this before the parsing of metadata is to avoid a performance overhead when TTL 
is not set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to