This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 465fac523da [fix] [broker] Can not receive any messages after switch
to standby cluster (#20767)
465fac523da is described below
commit 465fac523da946553b09298e13dc7dfcecfb6c78
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jul 13 09:51:45 2023 +0800
[fix] [broker] Can not receive any messages after switch to standby cluster
(#20767)
---
.../ReplicatedSubscriptionsController.java | 10 ++-
.../broker/service/ReplicatorSubscriptionTest.java | 91 ++++++++++++++++++++++
2 files changed, 98 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 335f2cf8eec..e011ed8d660 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -192,10 +192,14 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
sub.acknowledgeMessage(Collections.singletonList(pos),
AckType.Cumulative, Collections.emptyMap());
} else {
// Subscription doesn't exist. We need to force the creation of
the subscription in this cluster, because
- log.info("[{}][{}] Creating subscription at {}:{} after receiving
update from replicated subcription",
+ log.info("[{}][{}] Creating subscription at {}:{} after receiving
update from replicated subscription",
topic, update.getSubscriptionName(),
updatedMessageId.getLedgerId(), pos);
- topic.createSubscription(update.getSubscriptionName(),
- InitialPosition.Latest, true /* replicateSubscriptionState
*/, null);
+ topic.createSubscription(update.getSubscriptionName(),
InitialPosition.Earliest,
+ true /* replicateSubscriptionState */,
Collections.emptyMap())
+ .thenAccept(subscriptionCreated -> {
+
subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos),
+ AckType.Cumulative, Collections.emptyMap());
+ });
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 3982f44905d..2816a973c92 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -24,10 +24,12 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -41,6 +43,7 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,6 +158,93 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
"messages don't match.");
}
+ @Test
+ public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws
Exception {
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/tp_");
+ final String subscriptionName = "s1";
+ final boolean isReplicatedSubscription = true;
+ final int messagesCount = 20;
+ final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
+ final Set<String> receivedMessages = Collections.synchronizedSet(new
LinkedHashSet<>());
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest, isReplicatedSubscription);
+ final PersistentTopic topic1 =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // Send messages
+ // Wait for the topic created on the cluster2.
+ // Wait for the snapshot created.
+ final PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).build();
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ Consumer<String> consumer1 =
client1.newConsumer(Schema.STRING).topic(topicName)
+
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+ for (int i = 0; i < messagesCount / 2; i++) {
+ String msg = i + "";
+ producer1.send(msg);
+ sentMessages.add(msg);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, ? extends Replicator> replicators =
topic1.getReplicators();
+ assertTrue(replicators != null && replicators.size() == 1,
"Replicator should started");
+ assertTrue(replicators.values().iterator().next().isConnected(),
"Replicator should be connected");
+
assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
+ "One snapshot should be finished");
+ });
+ final PersistentTopic topic2 =
+ (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+
assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
+ "Replicated subscription controller should created");
+ });
+ for (int i = messagesCount / 2; i < messagesCount; i++) {
+ String msg = i + "";
+ producer1.send(msg);
+ sentMessages.add(msg);
+ }
+
+ // Consume half messages and wait the subscription created on the
cluster2.
+ for (int i = 0; i < messagesCount / 2; i++){
+ Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
+ if (message == null) {
+ fail("Should not receive null.");
+ }
+ receivedMessages.add(message.getValue());
+ consumer1.acknowledge(message);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(topic2.getSubscriptions().get(subscriptionName),
"Subscription should created");
+ });
+
+ // Switch client to cluster2.
+ // Since the cluster1 was not crash, all messages will be replicated
to the cluster2.
+ consumer1.close();
+ final PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString()).build();
+ final Consumer consumer2 =
client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
+
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+
+ // Verify all messages will be consumed.
+ Awaitility.await().untilAsserted(() -> {
+ while (true) {
+ Message message = consumer2.receive(2, TimeUnit.SECONDS);
+ if (message != null) {
+ receivedMessages.add(message.getValue().toString());
+ consumer2.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ assertEquals(receivedMessages.size(), sentMessages.size());
+ });
+
+ consumer2.close();
+ producer1.close();
+ client1.close();
+ client2.close();
+ }
+
/**
* If there's no traffic, the snapshot creation should stop and then
resume when traffic comes back
*/