Jason918 commented on a change in pull request #13710:
URL: https://github.com/apache/pulsar/pull/13710#discussion_r782767023
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1438,45 +1438,15 @@ public void checkMessageDeduplicationInfo() {
}
public CompletableFuture<Boolean> isCompactionEnabled() {
- Optional<Long> topicCompactionThreshold = getTopicPolicies()
Review comment:
We can change this method to return bool since it's not blocking any
more.
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
##########
@@ -1868,14 +1868,8 @@ public void
testCompactionTriggeredAfterThresholdFirstInvocation() throws Except
Compactor compactor = pulsar.getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
- Policies policies = new Policies();
- policies.compaction_threshold = 1L;
-
- NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
- NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
- doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
-
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ topic.topicPolicies.getCompactionThreshold().updateNamespaceValue(1L);
Review comment:
You can use `topic.initialize()` to trigger the init.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1438,45 +1438,15 @@ public void checkMessageDeduplicationInfo() {
}
public CompletableFuture<Boolean> isCompactionEnabled() {
- Optional<Long> topicCompactionThreshold = getTopicPolicies()
- .map(TopicPolicies::getCompactionThreshold);
- if (topicCompactionThreshold.isPresent() &&
topicCompactionThreshold.get() > 0) {
- return CompletableFuture.completedFuture(true);
- }
-
- TopicName topicName = TopicName.get(topic);
- return
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
- .getPoliciesAsync(topicName.getNamespaceObject())
- .thenApply(policies -> {
- if (policies.isPresent()) {
- return policies.get().compaction_threshold != null
- && policies.get().compaction_threshold > 0;
- } else {
- // Check broker default
- return brokerService.pulsar().getConfiguration()
- .getBrokerServiceCompactionThresholdInBytes()
> 0;
- }
- });
+ Long topicCompactionThreshold =
topicPolicies.getCompactionThreshold().get();
+ return CompletableFuture.completedFuture(topicCompactionThreshold !=
null && topicCompactionThreshold > 0);
}
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
- Long compactionThreshold = getTopicPolicies()
- .map(TopicPolicies::getCompactionThreshold)
- .orElse(null);
- if (compactionThreshold == null) {
- Policies policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources()
- .getPolicies(name.getNamespaceObject())
- .orElseThrow(() -> new
MetadataStoreException.NotFoundException());
- compactionThreshold = policies.compaction_threshold;
- }
- if (compactionThreshold == null) {
- compactionThreshold = brokerService.pulsar().getConfiguration()
- .getBrokerServiceCompactionThresholdInBytes();
- }
-
- if (isSystemTopic() || compactionThreshold != 0
+ Long compactionThreshold =
topicPolicies.getCompactionThreshold().get();
Review comment:
`compactionThreshold` can't be null since broker level value is `int`
type.
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
##########
@@ -1868,14 +1868,8 @@ public void
testCompactionTriggeredAfterThresholdFirstInvocation() throws Except
Compactor compactor = pulsar.getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
- Policies policies = new Policies();
- policies.compaction_threshold = 1L;
-
- NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
- NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
- doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
-
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ topic.topicPolicies.getCompactionThreshold().updateNamespaceValue(1L);
Review comment:
This does not cover the init procedure of
`topicPolicies.getCompactionThreshold()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]