This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5f55c7d Add consumer distribution test for Failover subscription when
update partitions. (#6699)
5f55c7d is described below
commit 5f55c7d1eccf40c5c3886b0ebc2898bc0da8b2e6
Author: lipenghui <[email protected]>
AuthorDate: Sun Apr 12 20:04:43 2020 +0800
Add consumer distribution test for Failover subscription when update
partitions. (#6699)
### Motivation
Add consumer distribution test for Failover subscription when update
partitions. This test is related to #6610
### Modifications
Add unit test
---
.../pulsar/client/impl/TopicsConsumerImplTest.java | 116 +++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 1 +
2 files changed, 117 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 1bbd758..5fd2d5b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import io.netty.util.Timeout;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.service.Topic;
@@ -45,13 +47,19 @@ import
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -769,6 +777,114 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
}
@Test(timeOut = testTimeout)
+ public void
testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws
Exception {
+ final String topicName =
"persistent://prop/use/ns-abc/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
+ final String subName = "failover-test";
+ TenantInfo tenantInfo = createDefaultTenantInfo();
+ admin.tenants().createTenant("prop", tenantInfo);
+ admin.topics().createPartitionedTopic(topicName, 2);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
2);
+ Consumer<String> consumer_1 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName(subName)
+ .subscribe();
+ assertTrue(consumer_1 instanceof MultiTopicsConsumerImpl);
+
+ assertEquals(((MultiTopicsConsumerImpl)
consumer_1).allTopicPartitionsNumber.get(), 2);
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return Integer.parseInt(msg.getKey()) %
metadata.numPartitions();
+ }
+ })
+ .create();
+
+ final int messages = 20;
+ for (int i = 0; i < messages; i++) {
+ producer.newMessage().key(String.valueOf(i)).value("message - " +
i).send();
+ }
+
+ int received = 0;
+ Message lastMessage = null;
+ for (int i = 0; i < messages; i++) {
+ lastMessage = consumer_1.receive();
+ received++;
+ }
+ assertEquals(received, messages);
+ consumer_1.acknowledgeCumulative(lastMessage);
+
+ // 1.Update partition and check message consumption
+ admin.topics().updatePartitionedTopic(topicName, 4);
+ log.info("trigger partitionsAutoUpdateTimerTask");
+ Timeout timeout = ((MultiTopicsConsumerImpl)
consumer_1).getPartitionsAutoUpdateTimeout();
+ timeout.task().run(timeout);
+ Thread.sleep(200);
+
+ assertEquals(((MultiTopicsConsumerImpl)
consumer_1).allTopicPartitionsNumber.get(), 4);
+ for (int i = 0; i < messages; i++) {
+ producer.newMessage().key(String.valueOf(i)).value("message - " +
i).send();
+ }
+
+ received = 0;
+ lastMessage = null;
+ for (int i = 0; i < messages; i++) {
+ lastMessage = consumer_1.receive();
+ received++;
+ }
+ assertEquals(received, messages);
+ consumer_1.acknowledgeCumulative(lastMessage);
+
+ // 2.Create a new consumer and check active consumer changed
+ Consumer<String> consumer_2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName(subName)
+ .subscribe();
+ assertTrue(consumer_2 instanceof MultiTopicsConsumerImpl);
+ assertEquals(((MultiTopicsConsumerImpl)
consumer_1).allTopicPartitionsNumber.get(), 4);
+
+ for (int i = 0; i < messages; i++) {
+ producer.newMessage().key(String.valueOf(i)).value("message - " +
i).send();
+ }
+
+ Map<String, AtomicInteger> activeConsumers = new HashMap<>();
+ PartitionedTopicStats stats =
admin.topics().getPartitionedStats(topicName, true);
+ for (TopicStats value : stats.partitions.values()) {
+ for (SubscriptionStats subscriptionStats :
value.subscriptions.values()) {
+
assertTrue(subscriptionStats.activeConsumerName.equals(consumer_1.getConsumerName())
+ ||
subscriptionStats.activeConsumerName.equals(consumer_2.getConsumerName()));
+
activeConsumers.putIfAbsent(subscriptionStats.activeConsumerName, new
AtomicInteger(0));
+
activeConsumers.get(subscriptionStats.activeConsumerName).incrementAndGet();
+ }
+ }
+ assertEquals(activeConsumers.get(consumer_1.getConsumerName()).get(),
2);
+ assertEquals(activeConsumers.get(consumer_2.getConsumerName()).get(),
2);
+
+ // 4.Check new consumer can receive half of total messages
+ received = 0;
+ lastMessage = null;
+ for (int i = 0; i < messages / 2; i++) {
+ lastMessage = consumer_1.receive();
+ received++;
+ }
+ assertEquals(received, messages / 2);
+ consumer_1.acknowledgeCumulative(lastMessage);
+
+ received = 0;
+ lastMessage = null;
+ for (int i = 0; i < messages / 2; i++) {
+ lastMessage = consumer_2.receive();
+ received++;
+ }
+ assertEquals(received, messages / 2);
+ consumer_2.acknowledgeCumulative(lastMessage);
+ }
+
+ @Test(timeOut = testTimeout)
public void testDefaultBacklogTTL() throws Exception {
int defaultTTLSec = 5;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e2016a9..18ada34 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1135,6 +1135,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
future.complete(null);
return future;
} else if (oldPartitionNumber < currentPartitionNumber) {
+ allTopicPartitionsNumber.compareAndSet(oldPartitionNumber,
currentPartitionNumber);
List<String> newPartitions = list.subList(oldPartitionNumber,
currentPartitionNumber);
// subscribe new added partitions
List<CompletableFuture<Consumer<T>>> futureList = newPartitions