This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6afb550c9bf9197ad1de100289627b4d67980592
Author: hrzzzz <[email protected]>
AuthorDate: Fri Dec 20 20:05:09 2024 +0800

    [fix][client] Fix reader message filtering issue during blue-green cluster 
switch (#23693)
    
    Co-authored-by: ruihongzhou <[email protected]>
    (cherry picked from commit 34c2f30d7838a1d50484985ee8bcfb1d573c50ed)
---
 .../broker/service/ClusterMigrationTest.java       | 111 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   7 ++
 2 files changed, 118 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index 380cb710baf..09a2339075a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
@@ -912,6 +913,116 @@ public class ClusterMigrationTest {
         client2.close();
     }
 
+    public void testMigrationWithReader() throws Exception {
+        final String topicName = BrokerTestUtil
+                .newUniqueName("persistent://" + namespace + 
"/migrationTopic");
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder()
+                .serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        // cluster-1 producer/reader
+        Producer<byte[]> producer1 = client1.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .producerName("cluster1-1")
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        Reader<byte[]> reader1 =client1.newReader()
+                .topic(topicName)
+                .startMessageId(MessageId.earliest)
+                .subscriptionRolePrefix("s1")
+                .create();
+
+        AbstractTopic topic1 = (AbstractTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
+        retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
+        retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 
500);
+        assertFalse(topic1.getProducers().isEmpty());
+        assertFalse(topic1.getSubscriptions().isEmpty());
+
+        // build backlog
+        reader1.close();
+        int n = 8;
+        for (int i = 0; i < n; i++) {
+            producer1.send("test1".getBytes());
+        }
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder()
+                .serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        // cluster-2 producer
+        Producer<byte[]> producer2 = client2.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .producerName("cluster2-1")
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        AbstractTopic topic2 = (AbstractTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
+        assertFalse(topic2.getProducers().isEmpty());
+        assertTrue(topic2.getSubscriptions().isEmpty());
+
+        // migrate topic to cluster-2
+        ClusterUrl migratedUrl = new 
ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
+                pulsar2.getBrokerServiceUrl(), null);
+        admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
+        
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(),
 migratedUrl);
+        retryStrategically((test) -> {
+            try {
+                topic1.checkClusterMigration().get();
+                return true;
+            } catch (Exception e) {
+                // ok
+            }
+            return false;
+        }, 10, 500);
+        topic1.checkClusterMigration().get();
+
+        sleep(1000);
+        producer1.sendAsync("test1".getBytes());
+
+        // producer is disconnected from cluster-1
+        retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
+        assertTrue(topic1.getProducers().isEmpty());
+
+        // producer is connected with cluster-2
+        retryStrategically((test) -> topic2.getProducers().size() == 2, 10, 
500);
+        assertEquals(topic2.getProducers().size(), 2);
+
+        // try to consume backlog messages from cluster-1
+        reader1 = client1.newReader()
+                .topic(topicName)
+                .startMessageId(MessageId.earliest)
+                .subscriptionRolePrefix("s1")
+                .create();
+        for (int i = 0; i < n; i++) {
+            Message<byte[]> msg = reader1.readNext();
+            assertEquals(msg.getData(), "test1".getBytes());
+        }
+
+        // after consuming all messages, reader should have disconnected from 
cluster-1 and reconnect with cluster-2
+        retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 
500);
+        assertFalse(topic2.getSubscriptions().isEmpty());
+        assertTrue(topic1.getSubscriptions().isEmpty());
+
+        n = 4;
+        // publish messages to cluster-2 and consume them
+        for (int i = 0; i < n; i++) {
+            producer1.send("test2".getBytes());
+        }
+
+        for (int i = 0; i < n; i++) {
+            assertEquals(reader1.readNext(2, TimeUnit.SECONDS).getData(), 
"test2".getBytes());
+        }
+
+        client1.close();
+        client2.close();
+    }
+
+
     @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
     public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType 
subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception 
{
         log.info("--- Starting 
ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e68441f0a19..df5d2f7184f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.FastThreadLocal;
 import io.opentelemetry.api.common.Attributes;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -3088,6 +3089,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 && 
Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
 
+    @Override
+    protected void setRedirectedClusterURI(String serviceUrl, String 
serviceUrlTls) throws URISyntaxException {
+        super.setRedirectedClusterURI(serviceUrl, serviceUrlTls);
+        acknowledgmentsGroupingTracker.flushAndClean();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
     @VisibleForTesting

Reply via email to