codelipenghui commented on code in PR #20767:
URL: https://github.com/apache/pulsar/pull/20767#discussion_r1257962402


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -154,6 +158,102 @@ public void testReplicatedSubscriptionAcrossTwoRegions() 
throws Exception {
                 "messages don't match.");
     }
 
+    @Test
+    public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws 
Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
+        final String subscriptionName = "s1";
+        final boolean isReplicatedSubscription = true;
+        final int messagesCount = 20;
+        final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
+        final Set<String> receivedMessages = Collections.synchronizedSet(new 
LinkedHashSet<>());
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest, isReplicatedSubscription);
+        final PersistentTopic topic1 =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+        // Send messages
+        // Wait for the topic created on the cluster2.
+        // Wait for the snapshot created.
+        final PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        Consumer<String> consumer1 = 
client1.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+        for (int i = 0; i < messagesCount / 2; i++) {
+            String msg = i + "";
+            producer1.send(msg);
+            sentMessages.add(msg);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, ? extends Replicator> replicators = 
topic1.getReplicators();
+            assertTrue(replicators != null && replicators.size() == 1, 
"Replicator should started");
+            assertTrue(replicators.values().iterator().next().isConnected(), 
"Replicator should be connected");
+            
assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
+                    "One snapshot should be finished");
+        });
+        final PersistentTopic topic2 =
+                (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
+                    "Replicated subscription controller should created");
+        });
+        for (int i = messagesCount / 2; i < messagesCount; i++) {
+            String msg = i + "";
+            producer1.send(msg);
+            sentMessages.add(msg);
+        }
+
+        // Consume half messages and wait the subscription created on the 
cluster2.
+        for (int i = 0; i < messagesCount / 2; i++){
+            Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
+            if (message == null) {
+                fail("Should not receive null.");
+            }
+            receivedMessages.add(message.getValue());
+            consumer1.acknowledge(message);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(topic2.getSubscriptions().get(subscriptionName), 
"Subscription should created");
+        });
+
+        // Switch client to cluster2.
+        // Since the cluster1 was not crash, all messages will be replicated 
to the cluster2.
+        consumer1.close();
+        final PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).build();
+        final Consumer consumer2 = 
client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
+                
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+
+        // Consume the messages which not acked.
+        Thread consumeTask2 = new Thread(() -> {
+            while (true) {
+                try {
+                    PersistentTopic t2 = topic2;
+                    Message message = consumer2.receive(2, TimeUnit.SECONDS);
+                    if (message != null) {
+                        receivedMessages.add(message.getValue().toString());
+                        consumer2.acknowledge(message);
+                    }
+                } catch (PulsarClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        consumeTask2.start();

Review Comment:
   Why does it need to be executed in a separate thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to