BewareMyPower commented on code in PR #20718:
URL: https://github.com/apache/pulsar/pull/20718#discussion_r1255403980


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -434,6 +420,37 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    protected CompletableFuture<Void> newTopicCompactionService() {
+        CompactionServiceFactory compactionServiceFactory = 
brokerService.pulsar().getCompactionServiceFactory();
+        return 
compactionServiceFactory.newTopicCompactionService(topic).thenAccept(topicCompactionService
 -> {
+            PersistentTopic.this.topicCompactionService = 
topicCompactionService;
+        });
+    }
+
+    private CompletableFuture<Void> createPersistentSubscriptions() {

Review Comment:
   We should not return a future here. The method itself is not asynchronous. 
If you want to propagate the exception to `initialize`, you should just call:
   
   ```java
       public CompletableFuture<Void> initialize() {
           try {
               createPersistentSubscriptions();
           } catch (Throwable throwable) {
               return FutureUtil.failedFuture(throwable);
           }
   ```
   
   I have another question that why did you move this logic out of the 
`PersistentTopic`'s constructor?



##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java:
##########
@@ -400,6 +400,16 @@ private static Pair<String, Integer> 
extractKeyAndSize(RawMessage m) {
         }
     }
 
+    protected List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSizeFromBatch(RawMessage msg)
+            throws IOException {
+        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+    }
+
+    protected Optional<RawMessage> rebatchMessage(RawMessage msg, 
BiPredicate<String, MessageId> filter)
+            throws IOException {
+        return RawBatchConverter.rebatchMessage(msg, filter);
+    }

Review Comment:
   Why did you make them protected? They are only used in the 
`TwoPhaseCompactor`, we should make them private.
   
   And these new methods don't make sense. They are non-static methods but they 
never access any field of the class itself.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -626,4 +633,57 @@ public void recycle() {
             recyclerHandle.recycle(this);
         }
     }
+
+    public static void readCompactedEntries(TopicCompactionService 
topicCompactionService, ManagedCursor cursor,

Review Comment:
   Why did you make it public? It should be private.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3188,7 +3209,7 @@ public synchronized LongRunningProcessStatus 
compactionStatus() {
             return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
         } else {
             try {
-                if (current.join() == COMPACTION_NEVER_RUN) {
+                if (Objects.equals(current.join(), COMPACTION_NEVER_RUN)) {

Review Comment:
   There is no need to use `Objects.equals` here. Even if `current.join()` is 
`null`, `null == COMPACTION_NEVER_RUN` is still a valid expression. The 
`Objects.equals` method is only used to replace the `x.equals(y)` call in case 
when `x` is `null`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3167,7 +3188,7 @@ public synchronized void triggerCompaction()
                 currentCompaction = 
brokerService.pulsar().getStrategicCompactor()
                         .compact(topic, strategicCompactionMap.get(topic));
             } else {
-                currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+                currentCompaction = 
topicCompactionService.compact().thenApply(x -> null);

Review Comment:
   Why did you add the `thenApply` here? I think `compact()` always completes 
with `null`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -434,6 +420,37 @@ public AtomicLong getPendingWriteOps() {
         return pendingWriteOps;
     }
 
+    protected CompletableFuture<Void> newTopicCompactionService() {

Review Comment:
   The same reason as I mentioned, it should be private.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -347,7 +354,7 @@ protected void readMoreEntries(Consumer consumer) {
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,

Review Comment:
   I think `CompactedTopic#asyncReadEntriesOrWait` can be marked as deprecated 
after this modification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to