heesung-sn commented on code in PR #21091:
URL: https://github.com/apache/pulsar/pull/21091#discussion_r1311129324
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java:
##########
@@ -90,6 +91,26 @@ public void setCryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}
+ @Override
+ public boolean add(MessageImpl<?> msg, SendCallback callback) {
+ this.lastAddedMessageId = (MessageIdImpl) msg.getMessageId();
Review Comment:
We need to clean this id when we call clear(). I wonder how the added test
passed without cleaning this id.
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -443,32 +442,50 @@ private <T> void phaseTwoLoop(String topic,
Iterator<Message<T>> reader,
<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore
outstanding) {
+ if (m == null) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ return CompletableFuture.completedFuture(false);
+ }
+ return flushBatchMessage(lh, topic, outstanding)
+ .thenCompose(__ -> {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
Review Comment:
Nit: recursively call this func .
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -418,10 +418,9 @@ private <T> void phaseTwoLoop(String topic,
Iterator<Message<T>> reader,
.whenComplete((res, exception2) -> {
if (exception2 != null) {
promise.completeExceptionally(exception2);
- return;
}
+ phaseTwoLoop(topic, reader, lh,
outstanding, promise);
});
- phaseTwoLoop(topic, reader, lh, outstanding, promise);
Review Comment:
We want to add entries in parallel . Otherwise , we wont need the
outstanding msg semaphore .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]