This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e06b0ac602d [fix][test] Fix
`PersistentDispatcherFailoverConsumerTest#testAddRemoveConsumer` wrong logic.
(#17216)
e06b0ac602d is described below
commit e06b0ac602d7864e7f70d3845c0aea90fa161107
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Aug 30 12:04:38 2022 +0800
[fix][test] Fix
`PersistentDispatcherFailoverConsumerTest#testAddRemoveConsumer` wrong logic.
(#17216)
---
.../PersistentDispatcherFailoverConsumerTest.java | 29 ++++++++++++----------
1 file changed, 16 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 0d2e5a9ee02..f0c61a1fc02 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -420,30 +420,33 @@ public class PersistentDispatcherFailoverConsumerTest {
verify(consumer2,
times(1)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2,
times(1)).notifyActiveConsumerChange(same(consumer0));
- // 7. Remove last consumer
+ // 7. Remove last consumer to make active consumer change.
pdfc.removeConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
assertEquals(3, consumers.size());
- // not consumer group changes
- assertNull(consumerChanges.poll());
+
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
+ verifyActiveConsumerChange(change, 0, false);
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
+ verifyActiveConsumerChange(change, 1, true);
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
+ verifyActiveConsumerChange(change, 1, true);
// 8. Verify if we cannot unsubscribe when more than one consumer is
connected
assertFalse(pdfc.canUnsubscribe(consumer0));
- // 9. Remove active consumer
+ // 9. Remove inactive consumer
pdfc.removeConsumer(consumer0);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
assertEquals(2, consumers.size());
- // the remaining consumers will receive notifications
- change = consumerChanges.poll(10, TimeUnit.SECONDS);
- assertNotNull(change);
- verifyActiveConsumerChange(change, 1, true);
- change = consumerChanges.poll(10, TimeUnit.SECONDS);
- assertNotNull(change);
- verifyActiveConsumerChange(change, 1, true);
+ // not consumer group changes
+ assertNull(consumerChanges.poll(10, TimeUnit.SECONDS));
// 10. Attempt to remove already removed consumer
String cause = "";
@@ -454,13 +457,13 @@ public class PersistentDispatcherFailoverConsumerTest {
}
assertEquals(cause, "Consumer was not connected");
- // 11. Remove active consumer
+ // 11. Remove same consumer
pdfc.removeConsumer(consumer1);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
assertEquals(1, consumers.size());
// not consumer group changes
- assertNull(consumerChanges.poll());
+ assertNull(consumerChanges.poll(10, TimeUnit.SECONDS));
// 11. With only one consumer, unsubscribe is allowed
assertTrue(pdfc.canUnsubscribe(consumer1));