rdhabalia commented on a change in pull request #2591: Fix: Compaction with 
last deleted keys not completing compaction
URL: https://github.com/apache/incubator-pulsar/pull/2591#discussion_r218200057
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 ##########
 @@ -153,40 +163,42 @@ private void 
scheduleTimeout(CompletableFuture<RawMessage> future) {
         });
     }
 
-    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
-                                             Map<String,MessageId> 
latestForKey, BookKeeper bk) {
+    private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to, MessageId lastReadId,
+            Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata = ImmutableMap.of("compactedTopic", 
reader.getTopic().getBytes(UTF_8),
-                                                       "compactedTo", 
to.toByteArray());
+                "compactedTo", to.toByteArray());
         return createLedger(bk, metadata).thenCompose((ledger) -> {
-                log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
-                         reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
-                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
-            });
+            log.info("Commencing phase two of compaction for {}, from {} to 
{}, compacting {} keys to ledger {}",
+                    reader.getTopic(), from, to, latestForKey.size(), 
ledger.getId());
+            return phaseTwoSeekThenLoop(reader, from, to, lastReadId, 
latestForKey, bk, ledger);
+        });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,
-                                                         Map<String, 
MessageId> latestForKey,
-                                                         BookKeeper bk, 
LedgerHandle ledger) {
+            MessageId lastReadId, Map<String, MessageId> latestForKey, 
BookKeeper bk, LedgerHandle ledger) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
 
-        reader.seekAsync(from).thenCompose((v) -> {
-                Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
-                CompletableFuture<Void> loopPromise = new 
CompletableFuture<Void>();
-                phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, 
loopPromise);
-                return loopPromise;
-            }).thenCompose((v) -> closeLedger(ledger))
-            .thenCompose((v) -> reader.acknowledgeCumulativeAsync(
-                                 to, 
ImmutableMap.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId())))
-            .whenComplete((res, exception) -> {
+        boolean emptyCompactedLedger = to == null;
 
 Review comment:
   sure, will add it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to