codelipenghui commented on code in PR #21187:
URL: https://github.com/apache/pulsar/pull/21187#discussion_r1338016772
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -349,7 +350,11 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
- boolean readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId());
+ boolean readFromEarliest = false;
+ if (!cursor.isDurable() || ((ManagedCursorImpl)
cursor).isCompactionCursor()
+ || cursor.getPersistentMarkDeletedPosition() ==
null) {
+ readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId());
Review Comment:
```suggestion
readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId());
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1985,4 +1985,55 @@ public void testCompactionDuplicate() throws Exception {
}
}
}
+
+ @Test(timeOut = 100000)
+ public void testAcknowledge() throws Exception {
+ final String topicName =
"persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+ final String subName = "my-sub";
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();
+
+ Map<String, String> expected = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+ expected.put(String.valueOf(i), String.valueOf(i));
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
client.newConsumer(Schema.STRING)
+
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+
+ consumer.acknowledge(message);
+ String remove = expected.remove(message.getKey());
+ Assert.assertEquals(remove, message.getValue());
+ }
+
+ // Make consumer reconnect to broker
Review Comment:
It's better to check if the acknowledgments have been processed by the
broker. Otherwise, it will introduce a flaky test. You can check the internal
stats of the topic or check the backlog of the topic to make sure all the
messages have been hacked.
For example:
```java
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName,
true).getSubscriptions().get(subName).getMsgBacklog(), 0));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]