This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f55c7d  Add consumer distribution test for Failover subscription when 
update partitions. (#6699)
5f55c7d is described below

commit 5f55c7d1eccf40c5c3886b0ebc2898bc0da8b2e6
Author: lipenghui <[email protected]>
AuthorDate: Sun Apr 12 20:04:43 2020 +0800

    Add consumer distribution test for Failover subscription when update 
partitions. (#6699)
    
    ### Motivation
    
    Add consumer distribution test for Failover subscription when update 
partitions. This test is related to #6610
    
    ### Modifications
    
    Add unit test
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 116 +++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |   1 +
 2 files changed, 117 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 1bbd758..5fd2d5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 
 import io.netty.util.Timeout;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import org.apache.pulsar.broker.service.Topic;
@@ -45,13 +47,19 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -769,6 +777,114 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
     }
 
     @Test(timeOut = testTimeout)
+    public void 
testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions() throws 
Exception {
+        final String topicName = 
"persistent://prop/use/ns-abc/testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions";
+        final String subName = "failover-test";
+        TenantInfo tenantInfo = createDefaultTenantInfo();
+        admin.tenants().createTenant("prop", tenantInfo);
+        admin.topics().createPartitionedTopic(topicName, 2);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 
2);
+        Consumer<String> consumer_1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(subName)
+                .subscribe();
+        assertTrue(consumer_1 instanceof MultiTopicsConsumerImpl);
+
+        assertEquals(((MultiTopicsConsumerImpl) 
consumer_1).allTopicPartitionsNumber.get(), 2);
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return Integer.parseInt(msg.getKey()) % 
metadata.numPartitions();
+                    }
+                })
+                .create();
+
+        final int messages = 20;
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().key(String.valueOf(i)).value("message - " + 
i).send();
+        }
+
+        int received = 0;
+        Message lastMessage = null;
+        for (int i = 0; i < messages; i++) {
+            lastMessage = consumer_1.receive();
+            received++;
+        }
+        assertEquals(received, messages);
+        consumer_1.acknowledgeCumulative(lastMessage);
+
+        // 1.Update partition and check message consumption
+        admin.topics().updatePartitionedTopic(topicName, 4);
+        log.info("trigger partitionsAutoUpdateTimerTask");
+        Timeout timeout = ((MultiTopicsConsumerImpl) 
consumer_1).getPartitionsAutoUpdateTimeout();
+        timeout.task().run(timeout);
+        Thread.sleep(200);
+
+        assertEquals(((MultiTopicsConsumerImpl) 
consumer_1).allTopicPartitionsNumber.get(), 4);
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().key(String.valueOf(i)).value("message - " + 
i).send();
+        }
+
+        received = 0;
+        lastMessage = null;
+        for (int i = 0; i < messages; i++) {
+            lastMessage = consumer_1.receive();
+            received++;
+        }
+        assertEquals(received, messages);
+        consumer_1.acknowledgeCumulative(lastMessage);
+
+        // 2.Create a new consumer and check active consumer changed
+        Consumer<String> consumer_2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(subName)
+                .subscribe();
+        assertTrue(consumer_2 instanceof MultiTopicsConsumerImpl);
+        assertEquals(((MultiTopicsConsumerImpl) 
consumer_1).allTopicPartitionsNumber.get(), 4);
+
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().key(String.valueOf(i)).value("message - " + 
i).send();
+        }
+
+        Map<String, AtomicInteger> activeConsumers = new HashMap<>();
+        PartitionedTopicStats stats = 
admin.topics().getPartitionedStats(topicName, true);
+        for (TopicStats value : stats.partitions.values()) {
+            for (SubscriptionStats subscriptionStats : 
value.subscriptions.values()) {
+                
assertTrue(subscriptionStats.activeConsumerName.equals(consumer_1.getConsumerName())
+                        || 
subscriptionStats.activeConsumerName.equals(consumer_2.getConsumerName()));
+                
activeConsumers.putIfAbsent(subscriptionStats.activeConsumerName, new 
AtomicInteger(0));
+                
activeConsumers.get(subscriptionStats.activeConsumerName).incrementAndGet();
+            }
+        }
+        assertEquals(activeConsumers.get(consumer_1.getConsumerName()).get(), 
2);
+        assertEquals(activeConsumers.get(consumer_2.getConsumerName()).get(), 
2);
+
+        // 4.Check new consumer can receive half of total messages
+        received = 0;
+        lastMessage = null;
+        for (int i = 0; i < messages / 2; i++) {
+            lastMessage = consumer_1.receive();
+            received++;
+        }
+        assertEquals(received, messages / 2);
+        consumer_1.acknowledgeCumulative(lastMessage);
+
+        received = 0;
+        lastMessage = null;
+        for (int i = 0; i < messages / 2; i++) {
+            lastMessage = consumer_2.receive();
+            received++;
+        }
+        assertEquals(received, messages / 2);
+        consumer_2.acknowledgeCumulative(lastMessage);
+    }
+
+    @Test(timeOut = testTimeout)
     public void testDefaultBacklogTTL() throws Exception {
 
         int defaultTTLSec = 5;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e2016a9..18ada34 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1135,6 +1135,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 future.complete(null);
                 return future;
             } else if (oldPartitionNumber < currentPartitionNumber) {
+                allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, 
currentPartitionNumber);
                 List<String> newPartitions = list.subList(oldPartitionNumber, 
currentPartitionNumber);
                 // subscribe new added partitions
                 List<CompletableFuture<Consumer<T>>> futureList = newPartitions

Reply via email to