heesung-sn commented on code in PR #21091:
URL: https://github.com/apache/pulsar/pull/21091#discussion_r1311139619
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -443,32 +442,50 @@ private <T> void phaseTwoLoop(String topic,
Iterator<Message<T>> reader,
<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore
outstanding) {
+ if (m == null) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ return CompletableFuture.completedFuture(false);
+ }
+ return flushBatchMessage(lh, topic, outstanding)
+ .thenCompose(__ -> {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding);
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
+ });
+ }
+
+ private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh,
String topic,
+ Semaphore
outstanding) {
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
- if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
- if (batchMessageContainer.getNumMessagesInBatch() > 0) {
- try {
- ByteBuf serialized = batchMessageContainer.toByteBuf();
- outstanding.acquire();
- mxBean.addCompactionWriteOp(topic,
serialized.readableBytes());
- long start = System.nanoTime();
- lh.asyncAddEntry(serialized,
- (rc, ledger, eid, ctx) -> {
- outstanding.release();
- mxBean.addCompactionLatencyOp(topic,
System.nanoTime() - start, TimeUnit.NANOSECONDS);
- if (rc != BKException.Code.OK) {
-
bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(true);
- }
- }, null);
+ if (batchMessageContainer.getNumMessagesInBatch() > 0) {
+ try {
+ ByteBuf serialized = batchMessageContainer.toByteBuf();
+ outstanding.acquire();
Review Comment:
Nit :Lets move this acquire right before addEntry
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java:
##########
@@ -443,32 +442,50 @@ private <T> void phaseTwoLoop(String topic,
Iterator<Message<T>> reader,
<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore
outstanding) {
+ if (m == null) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ return CompletableFuture.completedFuture(false);
+ }
+ return flushBatchMessage(lh, topic, outstanding)
+ .thenCompose(__ -> {
+ if (batchMessageContainer.add((MessageImpl<?>) m, null)) {
+ return flushBatchMessage(lh, topic, outstanding);
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
+ });
+ }
+
+ private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh,
String topic,
+ Semaphore
outstanding) {
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
- if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
- if (batchMessageContainer.getNumMessagesInBatch() > 0) {
- try {
- ByteBuf serialized = batchMessageContainer.toByteBuf();
- outstanding.acquire();
- mxBean.addCompactionWriteOp(topic,
serialized.readableBytes());
- long start = System.nanoTime();
- lh.asyncAddEntry(serialized,
- (rc, ledger, eid, ctx) -> {
- outstanding.release();
- mxBean.addCompactionLatencyOp(topic,
System.nanoTime() - start, TimeUnit.NANOSECONDS);
- if (rc != BKException.Code.OK) {
-
bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(true);
- }
- }, null);
+ if (batchMessageContainer.getNumMessagesInBatch() > 0) {
+ try {
+ ByteBuf serialized = batchMessageContainer.toByteBuf();
+ outstanding.acquire();
+ mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
+ long start = System.nanoTime();
+ lh.asyncAddEntry(serialized,
+ (rc, ledger, eid, ctx) -> {
+ outstanding.release();
+ mxBean.addCompactionLatencyOp(topic,
System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ if (rc != BKException.Code.OK) {
+
bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(true);
+ }
+ }, null);
Review Comment:
Nit: lets try to release the semaphore after completing the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]