BewareMyPower commented on code in PR #21091:
URL: https://github.com/apache/pulsar/pull/21091#discussion_r1312777867


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -443,35 +421,50 @@ private <T> void phaseTwoLoop(String topic, 
Iterator<Message<T>> reader,
 
     <T> CompletableFuture<Boolean> addToCompactedLedger(
             LedgerHandle lh, Message<T> m, String topic, Semaphore 
outstanding) {
-        CompletableFuture<Boolean> bkf = new CompletableFuture<>();
-        if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
-            if (batchMessageContainer.getNumMessagesInBatch() > 0) {
-                try {
-                    ByteBuf serialized = batchMessageContainer.toByteBuf();
-                    outstanding.acquire();
-                    mxBean.addCompactionWriteOp(topic, 
serialized.readableBytes());
-                    long start = System.nanoTime();
-                    lh.asyncAddEntry(serialized,
-                            (rc, ledger, eid, ctx) -> {
-                                outstanding.release();
-                                mxBean.addCompactionLatencyOp(topic, 
System.nanoTime() - start, TimeUnit.NANOSECONDS);
-                                if (rc != BKException.Code.OK) {
-                                    
bkf.completeExceptionally(BKException.create(rc));
-                                } else {
-                                    bkf.complete(true);
-                                }
-                            }, null);
-
-                } catch (Throwable t) {
-                    log.error("Failed to add entry", t);
-                    batchMessageContainer.discard((Exception) t);
-                    return FutureUtil.failedFuture(t);
-                }
-            } else {
-                bkf.complete(false);
+        if (m == null) {
+            return flushBatchMessage(lh, topic, outstanding);
+        }
+        if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+            if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+                return flushBatchMessage(lh, topic, outstanding);
             }

Review Comment:
   Should we remove this check? With the current implementation 
(`maxNumMessagesInBatch` and `maxBytesInBatch` are both `Integer.MAX_VALUE`), 
`add` never returns true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to