This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 465fac523da [fix] [broker] Can not receive any messages after switch 
to standby cluster (#20767)
465fac523da is described below

commit 465fac523da946553b09298e13dc7dfcecfb6c78
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 13 09:51:45 2023 +0800

    [fix] [broker] Can not receive any messages after switch to standby cluster 
(#20767)
---
 .../ReplicatedSubscriptionsController.java         | 10 ++-
 .../broker/service/ReplicatorSubscriptionTest.java | 91 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 335f2cf8eec..e011ed8d660 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
             sub.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Cumulative, Collections.emptyMap());
         } else {
             // Subscription doesn't exist. We need to force the creation of 
the subscription in this cluster, because
-            log.info("[{}][{}] Creating subscription at {}:{} after receiving 
update from replicated subcription",
+            log.info("[{}][{}] Creating subscription at {}:{} after receiving 
update from replicated subscription",
                     topic, update.getSubscriptionName(), 
updatedMessageId.getLedgerId(), pos);
-            topic.createSubscription(update.getSubscriptionName(),
-                    InitialPosition.Latest, true /* replicateSubscriptionState 
*/, null);
+            topic.createSubscription(update.getSubscriptionName(), 
InitialPosition.Earliest,
+                            true /* replicateSubscriptionState */, 
Collections.emptyMap())
+                    .thenAccept(subscriptionCreated -> {
+                        
subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos),
+                                AckType.Cumulative, Collections.emptyMap());
+                    });
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 3982f44905d..2816a973c92 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -24,10 +24,12 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -41,6 +43,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,6 +158,93 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
                 "messages don't match.");
     }
 
+    @Test
+    public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws 
Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
+        final String subscriptionName = "s1";
+        final boolean isReplicatedSubscription = true;
+        final int messagesCount = 20;
+        final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
+        final Set<String> receivedMessages = Collections.synchronizedSet(new 
LinkedHashSet<>());
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest, isReplicatedSubscription);
+        final PersistentTopic topic1 =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+        // Send messages
+        // Wait for the topic created on the cluster2.
+        // Wait for the snapshot created.
+        final PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).build();
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        Consumer<String> consumer1 = 
client1.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+        for (int i = 0; i < messagesCount / 2; i++) {
+            String msg = i + "";
+            producer1.send(msg);
+            sentMessages.add(msg);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, ? extends Replicator> replicators = 
topic1.getReplicators();
+            assertTrue(replicators != null && replicators.size() == 1, 
"Replicator should started");
+            assertTrue(replicators.values().iterator().next().isConnected(), 
"Replicator should be connected");
+            
assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
+                    "One snapshot should be finished");
+        });
+        final PersistentTopic topic2 =
+                (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
+                    "Replicated subscription controller should created");
+        });
+        for (int i = messagesCount / 2; i < messagesCount; i++) {
+            String msg = i + "";
+            producer1.send(msg);
+            sentMessages.add(msg);
+        }
+
+        // Consume half messages and wait the subscription created on the 
cluster2.
+        for (int i = 0; i < messagesCount / 2; i++){
+            Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
+            if (message == null) {
+                fail("Should not receive null.");
+            }
+            receivedMessages.add(message.getValue());
+            consumer1.acknowledge(message);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(topic2.getSubscriptions().get(subscriptionName), 
"Subscription should created");
+        });
+
+        // Switch client to cluster2.
+        // Since the cluster1 was not crash, all messages will be replicated 
to the cluster2.
+        consumer1.close();
+        final PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).build();
+        final Consumer consumer2 = 
client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
+                
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+
+        // Verify all messages will be consumed.
+        Awaitility.await().untilAsserted(() -> {
+            while (true) {
+                Message message = consumer2.receive(2, TimeUnit.SECONDS);
+                if (message != null) {
+                    receivedMessages.add(message.getValue().toString());
+                    consumer2.acknowledge(message);
+                } else {
+                    break;
+                }
+            }
+            assertEquals(receivedMessages.size(), sentMessages.size());
+        });
+
+        consumer2.close();
+        producer1.close();
+        client1.close();
+        client2.close();
+    }
+
     /**
      * If there's no traffic, the snapshot creation should stop and then 
resume when traffic comes back
      */

Reply via email to