codelipenghui commented on code in PR #21081:
URL: https://github.com/apache/pulsar/pull/21081#discussion_r1312937794
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -1926,4 +1926,69 @@ public void testDispatcherMaxReadSizeBytes() throws
Exception {
consumer.close();
producer.close();
}
+
+ @Test
+ public void testCompactionDuplicate() throws Exception {
+ String topic =
"persistent://my-property/use/my-ns/testCompactionDuplicate";
+ final int numMessages = 1000;
+ final int maxKeys = 800;
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // trigger compaction (create __compaction cursor)
+ admin.topics().triggerCompaction(topic);
+
+ Map<String, byte[]> expected = new HashMap<>();
+ Random r = new Random(0);
+
+
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ for (int j = 0; j < numMessages; j++) {
+ int keyIndex = r.nextInt(maxKeys);
+ String key = "key" + keyIndex;
+ byte[] data = ("my-message-" + key + "-" + j).getBytes();
+ producer.newMessage().key(key).value(data).send();
+ expected.put(key, data);
+ }
+
+ producer.flush();
+
+ // trigger compaction
+ admin.topics().triggerCompaction(topic);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topic).status,
+ LongRunningProcessStatus.Status.RUNNING);
+ });
+
+ // Wait for phase one to complete
+ Thread.sleep(500);
+
+ admin.topics().unload(topic);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topic, false);
+ // Compacted topic ledger should have same number of entry equals
to number of unique key.
+ Assert.assertEquals(expected.size(),
internalStats.compactedLedger.entries);
Review Comment:
```suggestion
Assert.assertEquals(internalStats.compactedLedger.entries,
expected.size());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]