This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6afb550c9bf9197ad1de100289627b4d67980592 Author: hrzzzz <[email protected]> AuthorDate: Fri Dec 20 20:05:09 2024 +0800 [fix][client] Fix reader message filtering issue during blue-green cluster switch (#23693) Co-authored-by: ruihongzhou <[email protected]> (cherry picked from commit 34c2f30d7838a1d50484985ee8bcfb1d573c50ed) --- .../broker/service/ClusterMigrationTest.java | 111 +++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++ 2 files changed, 118 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 380cb710baf..09a2339075a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; @@ -912,6 +913,116 @@ public class ClusterMigrationTest { client2.close(); } + public void testMigrationWithReader() throws Exception { + final String topicName = BrokerTestUtil + .newUniqueName("persistent://" + namespace + "/migrationTopic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder() + .serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-1 producer/reader + Producer<byte[]> producer1 = client1.newProducer() + .topic(topicName) + .enableBatching(false) + .producerName("cluster1-1") + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Reader<byte[]> reader1 =client1.newReader() + .topic(topicName) + .startMessageId(MessageId.earliest) + .subscriptionRolePrefix("s1") + .create(); + + AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(topic1.getProducers().isEmpty()); + assertFalse(topic1.getSubscriptions().isEmpty()); + + // build backlog + reader1.close(); + int n = 8; + for (int i = 0; i < n; i++) { + producer1.send("test1".getBytes()); + } + + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-2 producer + Producer<byte[]> producer2 = client2.newProducer() + .topic(topicName) + .enableBatching(false) + .producerName("cluster2-1") + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + assertFalse(topic2.getProducers().isEmpty()); + assertTrue(topic2.getSubscriptions().isEmpty()); + + // migrate topic to cluster-2 + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), null); + admin1.clusters().updateClusterMigration("r1", true, migratedUrl); + assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl); + retryStrategically((test) -> { + try { + topic1.checkClusterMigration().get(); + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + topic1.checkClusterMigration().get(); + + sleep(1000); + producer1.sendAsync("test1".getBytes()); + + // producer is disconnected from cluster-1 + retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500); + assertTrue(topic1.getProducers().isEmpty()); + + // producer is connected with cluster-2 + retryStrategically((test) -> topic2.getProducers().size() == 2, 10, 500); + assertEquals(topic2.getProducers().size(), 2); + + // try to consume backlog messages from cluster-1 + reader1 = client1.newReader() + .topic(topicName) + .startMessageId(MessageId.earliest) + .subscriptionRolePrefix("s1") + .create(); + for (int i = 0; i < n; i++) { + Message<byte[]> msg = reader1.readNext(); + assertEquals(msg.getData(), "test1".getBytes()); + } + + // after consuming all messages, reader should have disconnected from cluster-1 and reconnect with cluster-2 + retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); + assertFalse(topic2.getSubscriptions().isEmpty()); + assertTrue(topic1.getSubscriptions().isEmpty()); + + n = 4; + // publish messages to cluster-2 and consume them + for (int i = 0; i < n; i++) { + producer1.send("test2".getBytes()); + } + + for (int i = 0; i < n; i++) { + assertEquals(reader1.readNext(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); + } + + client1.close(); + client2.close(); + } + + @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes") public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e68441f0a19..df5d2f7184f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -37,6 +37,7 @@ import io.netty.util.concurrent.FastThreadLocal; import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -3088,6 +3089,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion()); } + @Override + protected void setRedirectedClusterURI(String serviceUrl, String serviceUrlTls) throws URISyntaxException { + super.setRedirectedClusterURI(serviceUrl, serviceUrlTls); + acknowledgmentsGroupingTracker.flushAndClean(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); @VisibleForTesting
