codelipenghui commented on a change in pull request #8552:
URL: https://github.com/apache/pulsar/pull/8552#discussion_r522927597



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
##########
@@ -92,6 +92,98 @@ public void testDuplicationApi() throws Exception {
         assertNull(admin.topics().getDeduplicationEnabled(topicName));
     }
 
+    @Test(timeOut = 10000)
+    public void testDuplicationSnapshotApi() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, 3);
+        waitCacheInit(topicName);
+        Integer interval = 
admin.topics().getDeduplicationSnapshotInterval(topicName);
+        assertNull(interval);
+
+        admin.topics().setDeduplicationSnapshotInterval(topicName, 1024);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getDeduplicationSnapshotInterval(topicName) != 
null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        
Assert.assertEquals(admin.topics().getDeduplicationSnapshotInterval(topicName).intValue(),
 1024);
+        admin.topics().removeDeduplicationSnapshotInterval(topicName);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getDeduplicationSnapshotInterval(topicName) == 
null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertNull(admin.topics().getDeduplicationSnapshotInterval(topicName));
+    }
+
+    @Test(timeOut = 30000)
+    private void testTopicPolicyTakeSnapshot() throws Exception {
+        resetConfig();
+        conf.setSystemTopicEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setBrokerDeduplicationEnabled(true);
+        conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
+        conf.setBrokerDeduplicationSnapshotIntervalSeconds(4);
+        conf.setBrokerDeduplicationEntriesInterval(20000);
+        super.internalCleanup();
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String producerName = "my-producer";
+        @Cleanup
+        Producer<String> producer = pulsarClient
+                
.newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
+        waitCacheInit(topicName);
+        admin.topics().setDeduplicationSnapshotInterval(topicName, 1);
+        admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 2);
+
+        int msgNum = 50;
+        CountDownLatch countDownLatch = new CountDownLatch(msgNum);
+        for (int i = 0; i < msgNum; i++) {
+            producer.newMessage().value("msg" + 
i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
+        }
+        countDownLatch.await();
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+        long seqId = 
persistentTopic.getMessageDeduplication().highestSequencedPersisted.get(producerName);
+        PositionImpl position = (PositionImpl) 
persistentTopic.getMessageDeduplication().getManagedCursor()
+                .getManagedLedger().getLastConfirmedEntry();
+        assertEquals(seqId, msgNum - 1);
+        assertEquals(position.getEntryId(), msgNum - 1);
+        //The first time, use topic-leve policies, 1 second delay + 1 second 
interval
+        Thread.sleep(2000);

Review comment:
       I have added awaitility dependency so that we can avoid use sleep in the 
test, for more details you can see https://github.com/apache/pulsar/pull/8557
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to