codelipenghui commented on code in PR #21187:
URL: https://github.com/apache/pulsar/pull/21187#discussion_r1338016772


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -349,7 +350,11 @@ protected void readMoreEntries(Consumer consumer) {
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    boolean readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());
+                    boolean readFromEarliest = false;
+                    if (!cursor.isDurable() || ((ManagedCursorImpl) 
cursor).isCompactionCursor()
+                            || cursor.getPersistentMarkDeletedPosition() == 
null) {
+                        readFromEarliest =  isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());

Review Comment:
   ```suggestion
                           readFromEarliest = isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId());
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1985,4 +1985,55 @@ public void testCompactionDuplicate() throws Exception {
             }
         }
     }
+
+    @Test(timeOut = 100000)
+    public void testAcknowledge() throws Exception {
+        final String topicName = 
"persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+        final String subName = "my-sub";
+        @Cleanup
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+        Map<String, String> expected = new HashMap<>();
+        for (int i = 0; i < 10; i++) {
+            
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+            expected.put(String.valueOf(i), String.valueOf(i));
+        }
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
client.newConsumer(Schema.STRING)
+                
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+
+            consumer.acknowledge(message);
+            String remove = expected.remove(message.getKey());
+            Assert.assertEquals(remove, message.getValue());
+        }
+
+        // Make consumer reconnect to broker

Review Comment:
   It's better to check if the acknowledgments have been processed by the 
broker. Otherwise, it will introduce a flaky test. You can check the internal 
stats of the topic or check the backlog of the topic to make sure all the 
messages have been hacked.
   
   For example:
   
   ```java
   Awaitility.await().untilAsserted(() ->
                   assertEquals(admin.topics().getStats(topicName, 
true).getSubscriptions().get(subName).getMsgBacklog(), 0));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to