poorbarcode commented on code in PR #20597:
URL: https://github.com/apache/pulsar/pull/20597#discussion_r1244811010
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java:
##########
@@ -728,6 +728,40 @@ public void testReplicatorClearBacklog() throws Exception {
assertEquals(status.getReplicationBacklog(), 0);
}
+
+ @Test(timeOut = 30000)
+ public void testResetReplicatorSubscriptionPosition() throws Exception {
+ final TopicName dest = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
+
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url3, dest);
Review Comment:
We can remove the unnecessary consumer
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java:
##########
@@ -728,6 +728,40 @@ public void testReplicatorClearBacklog() throws Exception {
assertEquals(status.getReplicationBacklog(), 0);
}
+
+ @Test(timeOut = 30000)
+ public void testResetReplicatorSubscriptionPosition() throws Exception {
+ final TopicName dest = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
+
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url3, dest);
+
+ // Produce from cluster1 and consume from the rest
+ for (int i = 0; i < 10; i++) {
+ producer1.produce(2);
+ }
+
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
+
+ Position lastPosition = topic.getLastPosition();
+
+ PersistentReplicator replicator = (PersistentReplicator) spy(
+
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
+
+ // make replicator backoff readMessage.
+ replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+
+ replicator.updateRates(); // for code-coverage
+
+ replicator.expireMessages(lastPosition); // for code-coverage
Review Comment:
Could you add a new test use admin API to reproduce this issue?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java:
##########
@@ -728,6 +728,40 @@ public void testReplicatorClearBacklog() throws Exception {
assertEquals(status.getReplicationBacklog(), 0);
}
+
+ @Test(timeOut = 30000)
+ public void testResetReplicatorSubscriptionPosition() throws Exception {
+ final TopicName dest = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
+
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url3, dest);
+
+ // Produce from cluster1 and consume from the rest
+ for (int i = 0; i < 10; i++) {
+ producer1.produce(2);
+ }
+
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
+
+ Position lastPosition = topic.getLastPosition();
+
+ PersistentReplicator replicator = (PersistentReplicator) spy(
+
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
+
+ // make replicator backoff readMessage.
+ replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
Review Comment:
I think this step is unnecessary, can we remove it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]