BewareMyPower commented on code in PR #20718:
URL: https://github.com/apache/pulsar/pull/20718#discussion_r1255403980
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -434,6 +420,37 @@ public AtomicLong getPendingWriteOps() {
return pendingWriteOps;
}
+ protected CompletableFuture<Void> newTopicCompactionService() {
+ CompactionServiceFactory compactionServiceFactory =
brokerService.pulsar().getCompactionServiceFactory();
+ return
compactionServiceFactory.newTopicCompactionService(topic).thenAccept(topicCompactionService
-> {
+ PersistentTopic.this.topicCompactionService =
topicCompactionService;
+ });
+ }
+
+ private CompletableFuture<Void> createPersistentSubscriptions() {
Review Comment:
We should not return a future here. The method itself is not asynchronous.
If you want to propagate the exception to `initialize`, you should just call:
```java
public CompletableFuture<Void> initialize() {
try {
createPersistentSubscriptions();
} catch (Throwable throwable) {
return FutureUtil.failedFuture(throwable);
}
```
I have another question that why did you move this logic out of the
`PersistentTopic`'s constructor?
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -400,6 +400,16 @@ private static Pair<String, Integer>
extractKeyAndSize(RawMessage m) {
}
}
+ protected List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
+ throws IOException {
+ return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+ }
+
+ protected Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
+ throws IOException {
+ return RawBatchConverter.rebatchMessage(msg, filter);
+ }
Review Comment:
Why did you make them protected? They are only used in the
`TwoPhaseCompactor`, we should make them private.
And these new methods don't make sense. They are non-static methods but they
never access any field of the class itself.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -626,4 +633,57 @@ public void recycle() {
recyclerHandle.recycle(this);
}
}
+
+ public static void readCompactedEntries(TopicCompactionService
topicCompactionService, ManagedCursor cursor,
Review Comment:
Why did you make it public? It should be private.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3188,7 +3209,7 @@ public synchronized LongRunningProcessStatus
compactionStatus() {
return
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
} else {
try {
- if (current.join() == COMPACTION_NEVER_RUN) {
+ if (Objects.equals(current.join(), COMPACTION_NEVER_RUN)) {
Review Comment:
There is no need to use `Objects.equals` here. Even if `current.join()` is
`null`, `null == COMPACTION_NEVER_RUN` is still a valid expression. The
`Objects.equals` method is only used to replace the `x.equals(y)` call in case
when `x` is `null`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3167,7 +3188,7 @@ public synchronized void triggerCompaction()
currentCompaction =
brokerService.pulsar().getStrategicCompactor()
.compact(topic, strategicCompactionMap.get(topic));
} else {
- currentCompaction =
brokerService.pulsar().getCompactor().compact(topic);
+ currentCompaction =
topicCompactionService.compact().thenApply(x -> null);
Review Comment:
Why did you add the `thenApply` here? I think `compact()` always completes
with `null`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -434,6 +420,37 @@ public AtomicLong getPendingWriteOps() {
return pendingWriteOps;
}
+ protected CompletableFuture<Void> newTopicCompactionService() {
Review Comment:
The same reason as I mentioned, it should be private.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -347,7 +354,7 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
Review Comment:
I think `CompactedTopic#asyncReadEntriesOrWait` can be marked as deprecated
after this modification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]