Denovo1998 commented on code in PR #25010:
URL: https://github.com/apache/pulsar/pull/25010#discussion_r2595952760
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2042,6 +2042,13 @@ protected void handleSend(CommandSend send, ByteBuf
headersAndPayload) {
producer.publishMessage(send.getProducerId(),
send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker(),
position);
}
+
+ // count delayed message times exceeding the ttl policy time
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ if (msgMetadata.hasDeliverAtTime() && service.getPulsar()
+ .isMessageDelayTimeExceedTTL(producer.getTopic(),
msgMetadata.getDeliverAtTime())) {
+ producer.getTopic().incrementExceedTTLDelayMessages();
+ }
Review Comment:
Here we might want to put it directly in PersistentTopic.publishMessage to
use the parsed metadata + TTL to complete the statistics.
Perhaps it can be done in
`org.apache.pulsar.broker.service.persistent.PersistentTopic#isExceedMaximumDeliveryDelay`:
```java
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() -
msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
}
return false;
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -2252,4 +2252,17 @@ public HealthChecker getHealthChecker() {
}
return healthChecker;
}
+
+ /**
+ * Check if message delay time exceeds TTL
+ *
+ * @param topic
+ * @param deliverAtTime
+ * @return true if message delay time exceeds TTL, false otherwise
+ */
+ public boolean isMessageDelayTimeExceedTTL(Topic topic, long
deliverAtTime) {
Review Comment:
Constraints/statistics related to delay are more unified in
`PersistentTopic`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -2252,4 +2252,17 @@ public HealthChecker getHealthChecker() {
}
return healthChecker;
}
+
+ /**
+ * Check if message delay time exceeds TTL
+ *
+ * @param topic
+ * @param deliverAtTime
+ * @return true if message delay time exceeds TTL, false otherwise
+ */
+ public boolean isMessageDelayTimeExceedTTL(Topic topic, long
deliverAtTime) {
+ return deliverAtTime
+ >=
topic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get() * 1000 +
System.currentTimeMillis();
Review Comment:
When TTL is not enabled, i.e., TTL <= 0. The condition here will become
deliverAtTime >= now. For any normal delayed message (deliverAtTime > now), it
will be true, meaning "all delayed messages" will be counted as "expired TTL".
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -297,6 +295,8 @@ protected TopicStatsHelper initialValue() {
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new
PersistentTopicMetrics();
+ private final Rate exceedTTLDelayMessage = new Rate();
Review Comment:
In this pr all parameters or function names regarding "delay message" should
be changed to "delayed messages".
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4913,4 +4872,14 @@ public void readEntryFailed(ManagedLedgerException
exception, Object ctx) {
return future;
}
+
+ @Override
+ public void incrementExceedTTLDelayMessages() {
+ this.exceedTTLDelayMessage.recordEvent();
+ }
+
+ @Override
+ public long getExceedTTLDelayMessages() {
+ return this.exceedTTLDelayMessage.getCount();
Review Comment:
The return value of getCount() is "the number of events in the most recent
period" (i.e., the number of events accumulated since the last call to
calculateRate());
Here is perhaps to count "how many delayed messages exceeded the TTL"?
```java
exceedTTLDelayMessage.getTotalCount()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]