merlimat closed pull request #1335: Log phases in two phase compaction
URL: https://github.com/apache/incubator-pulsar/pull/1335
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 7f7cd4016..f9d297f48 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -37,6 +37,13 @@
         return future.thenCompose((consumer) -> 
r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
     }
 
+    /**
+     * Get the topic for the reader
+     *
+     * @return topic for the reader
+     */
+    String getTopic();
+
     /**
      * Seek to a location in the topic. After the seek, the first message read 
will be the one with
      * with the specified message ID.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 75cbbbcb8..61a1fc267 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -59,6 +59,12 @@ public RawReaderImpl(PulsarClientImpl client, String topic, 
String subscription,
                                        consumerFuture);
     }
 
+    @Override
+    public String getTopic() {
+        return consumerConfiguration.getTopicNames().stream()
+            .findFirst().orElse(null);
+    }
+
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
         return consumer.seekAsync(messageId);
@@ -84,6 +90,11 @@ public RawReaderImpl(PulsarClientImpl client, String topic, 
String subscription,
         return consumer.getLastMessageIdAsync();
     }
 
+    @Override
+    public String toString() {
+        return "RawReader(topic=" + getTopic() + ")";
+    }
+
     static class RawConsumerImpl extends ConsumerImpl<byte[]> {
         final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 7ae788c70..3e2d25969 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -83,6 +83,7 @@ public TwoPhaseCompactor(ServiceConfiguration conf,
                     if (exception != null) {
                         loopPromise.completeExceptionally(exception);
                     } else {
+                        log.info("Commencing phase one of compaction for {}, 
reading to {}", reader, lastMessageId);
                         phaseOneLoop(reader, Optional.empty(), lastMessageId, 
latestForKey, loopPromise);
                     }
                 });
@@ -136,8 +137,12 @@ private void scheduleTimeout(CompletableFuture<RawMessage> 
future) {
 
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
                                              Map<String,MessageId> 
latestForKey, BookKeeper bk) {
-        return createLedger(bk).thenCompose(
-                (ledger) -> phaseTwoSeekThenLoop(reader, from, to, 
latestForKey, bk, ledger));
+
+        return createLedger(bk).thenCompose((ledger) -> {
+                log.info("Commencing phase two of compaction for {}, from {} 
to {}, compacting {} keys to ledger {}",
+                         reader, from, to, latestForKey.size(), 
ledger.getId());
+                return phaseTwoSeekThenLoop(reader, from, to, latestForKey, 
bk, ledger);
+            });
     }
 
     private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, 
MessageId from, MessageId to,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to