michaeljmarshall commented on a change in pull request #11564:
URL: https://github.com/apache/pulsar/pull/11564#discussion_r685346289
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1809,9 +1811,25 @@ public TopicStatsImpl getStats(boolean
getPreciseBacklog, boolean subscriptionBa
stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
stats.lastOffloadSuccessTimeStamp =
ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp =
ledger.getLastOffloadedFailureTimestamp();
+ stats.compactedTopicSize = getCompactorMXBean().map(stat ->
stat.getCompactedTopicSize(topic)).orElse(0D);
+ stats.compactedTopicMsgCount = getCompactorMXBean().map(stat ->
+ stat.getCompactedTopicMsgCount(topic)).orElse(0L);
+ stats.compactedTopicRemovedEventCount = getCompactorMXBean().map(stat
->
+ stat.getCompactedTopicRemovedEventCount(topic)).orElse(0L);
+ log.info("stats : {}", stats);
return stats;
}
+ private Optional<CompactorMXBean> getCompactorMXBean() {
+ Compactor compactor = null;
+ try {
+ compactor = brokerService.pulsar().getCompactor();
Review comment:
It looks like the call to `getCompactor` initializes a
`TwoPhaseCompactor` class, if it hasn't already been initialized. Currently,
the class is only initialized for a broker when a topic hosted by the broker
has compaction turned on. Wouldn't we want to avoid initializing this class, if
possible? One way to do this would be to add a method to the `PulsarService`
class that optionally gets the compactor or returns an empty option if it
hasn't been initialized.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1809,9 +1811,25 @@ public TopicStatsImpl getStats(boolean
getPreciseBacklog, boolean subscriptionBa
stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
stats.lastOffloadSuccessTimeStamp =
ledger.getLastOffloadedSuccessTimestamp();
stats.lastOffloadFailureTimeStamp =
ledger.getLastOffloadedFailureTimestamp();
+ stats.compactedTopicSize = getCompactorMXBean().map(stat ->
stat.getCompactedTopicSize(topic)).orElse(0D);
+ stats.compactedTopicMsgCount = getCompactorMXBean().map(stat ->
+ stat.getCompactedTopicMsgCount(topic)).orElse(0L);
+ stats.compactedTopicRemovedEventCount = getCompactorMXBean().map(stat
->
+ stat.getCompactedTopicRemovedEventCount(topic)).orElse(0L);
+ log.info("stats : {}", stats);
Review comment:
This log line seems like it could be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]