BewareMyPower commented on code in PR #21091:
URL: https://github.com/apache/pulsar/pull/21091#discussion_r1312618835
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java:
##########
@@ -44,16 +44,17 @@
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2,
v3), (k3, v3)]
*/
public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
- MessageCrypto msgCrypto;
- Set<String> encryptionKeys;
- CryptoKeyReader cryptoKeyReader;
+ private MessageCrypto<MessageMetadata, MessageMetadata> msgCrypto;
+ private Set<String> encryptionKeys;
+ private CryptoKeyReader cryptoKeyReader;
+ private MessageIdImpl lastAddedMessageId;
- public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int
maxBytesInBatch) {
+ public RawBatchMessageContainerImpl() {
super();
this.compressionType = CompressionType.NONE;
this.compressor = new CompressionCodecNone();
- this.maxNumMessagesInBatch = maxNumMessagesInBatch;
- this.maxBytesInBatch = maxBytesInBatch;
+ this.maxNumMessagesInBatch = Integer.MAX_VALUE;
+ this.maxBytesInBatch = Integer.MAX_VALUE;
Review Comment:
It seems that these two fields only affect `haveEnoughSpace` and `isFull`.
If your intention is to customize the behavior of these two methods, I think
it's better to just override them. Since `hasEnoughSpace` is already
overridden, you can override `isFull` with a trivial implementation that
returns false.
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java:
##########
@@ -90,6 +91,23 @@ public void setCryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}
+ @Override
+ public boolean add(MessageImpl<?> msg, SendCallback callback) {
+ this.lastAddedMessageId = (MessageIdImpl) msg.getMessageId();
+ return super.add(msg, callback);
+ }
+
+ @Override
+ public boolean haveEnoughSpace(MessageImpl<?> msg) {
+ if (lastAddedMessageId == null) {
+ return true;
+ }
+ // Keep same batch compact to same batch.
+ MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ return msgId.getLedgerId() == lastAddedMessageId.getLedgerId()
+ && msgId.getEntryId() == lastAddedMessageId.getEntryId();
Review Comment:
If you just want to compare if `msg.getMessageId()` equals
`lastAddedMessageId`, you can use `MessageIdAdvUtils#equals`
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -443,32 +421,48 @@ private <T> void phaseTwoLoop(String topic,
Iterator<Message<T>> reader,
<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore
outstanding) {
+ if (m == null) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ return CompletableFuture.completedFuture(false);
+ }
+ CompletableFuture<Boolean> f = flushBatchMessage(lh, topic,
outstanding);
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding).thenCombine(f,
(a, b) -> a && b);
+ } else {
+ return f;
+ }
+ }
+
+ private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh,
String topic,
+ Semaphore
outstanding) {
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
- if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
- if (batchMessageContainer.getNumMessagesInBatch() > 0) {
- try {
- ByteBuf serialized = batchMessageContainer.toByteBuf();
- outstanding.acquire();
- mxBean.addCompactionWriteOp(topic,
serialized.readableBytes());
- long start = System.nanoTime();
- lh.asyncAddEntry(serialized,
- (rc, ledger, eid, ctx) -> {
- outstanding.release();
- mxBean.addCompactionLatencyOp(topic,
System.nanoTime() - start, TimeUnit.NANOSECONDS);
- if (rc != BKException.Code.OK) {
-
bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(true);
- }
- }, null);
+ if (batchMessageContainer.getNumMessagesInBatch() > 0) {
Review Comment:
I see you already made much changes to `addToCompactedLedger`, for the
refactored code, you can just use early return.
The previous code:
```java
if (m == null || batchMessageContainer.add((MessageImpl<?>) m,
null)) {
if (batchMessageContainer.getNumMessagesInBatch() > 0) {
try {
/* ... */
} catch (Throwable t) {
log.error("Failed to add entry", t);
batchMessageContainer.discard((Exception) t);
return FutureUtil.failedFuture(t);
}
} else {
bkf.complete(false);
}
} else {
bkf.complete(false);
}
```
After applying early return idiom:
```java
if (m != null && !batchMessageContainer.add((MessageImpl<?>) m,
null)) {
return CompletableFuture.completedFuture(false);
}
if (batchMessageContainer.getNumMessagesInBatch() <= 0) {
return CompletableFuture.completedFuture(false);
}
// try-catch...
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java:
##########
@@ -90,6 +91,23 @@ public void setCryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}
+ @Override
+ public boolean add(MessageImpl<?> msg, SendCallback callback) {
+ this.lastAddedMessageId = (MessageIdImpl) msg.getMessageId();
+ return super.add(msg, callback);
+ }
+
+ @Override
+ public boolean haveEnoughSpace(MessageImpl<?> msg) {
+ if (lastAddedMessageId == null) {
Review Comment:
In which case could `lastAddedMessageId` be `null`? What I can think of is
when there is no message added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]