This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 7cb32a2e3ef [fix] [broker] Make the new exclusive consumer instead the 
inactive one faster (#21183)
7cb32a2e3ef is described below

commit 7cb32a2e3ef9976693cc35663261e163bf47d624
Author: fengyubiao <[email protected]>
AuthorDate: Mon Oct 30 18:57:49 2023 +0800

    [fix] [broker] Make the new exclusive consumer instead the inactive one 
faster (#21183)
    
    There is an issue similar to the 
https://github.com/apache/pulsar/pull/21155 fixed one.
    
    The client assumed the connection was inactive, but the Broker assumed the 
connection was fine. The Client tried to  use a new connection to reconnect an 
exclusive consumer, then got an error `Exclusive consumer is already connected`
    
    - Check the connection of the old consumer is available when the new one 
tries to subscribe
    
    (cherry picked from commit 29db8f84e5f0f99b110d62090ab670c59bf4638a)
---
 .../AbstractDispatcherSingleActiveConsumer.java    |  4 +
 .../pulsar/broker/service/PersistentTopicTest.java | 11 ++-
 .../pulsar/broker/service/ServerCnxTest.java       | 90 ++++++++++++++++++++--
 .../broker/service/utils/ClientChannelHelper.java  | 32 ++++++++
 4 files changed, 128 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 17a6d1dbfb3..9b4b250b577 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -144,6 +144,10 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         }
 
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
+            Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            if (actConsumer != null) {
+                actConsumer.cnx().checkConnectionLiveness();
+            }
             throw new ConsumerBusyException("Exclusive consumer is already 
connected");
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 67ef1bbfef7..0e7a26f0cf9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -228,6 +228,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
         doReturn(channel).when(ctx).channel();
         doReturn(ctx).when(serverCnx).ctx();
+        
doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
@@ -741,7 +742,15 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         f1.get();
 
         // 2. duplicate subscribe
-        Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
+        CommandSubscribe cmd2 = new CommandSubscribe()
+                .setConsumerId(2)
+                .setTopic(successTopicName)
+                .setSubscription(successSubName)
+                .setConsumerName("consumer-name")
+                .setReadCompacted(false)
+                .setRequestId(2)
+                .setSubType(SubType.Exclusive);
+        Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd2));
         try {
             f2.get();
             fail("should fail with exception");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5457a6088f2..d20ae4798f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -60,11 +60,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -102,6 +106,7 @@ import 
org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -923,6 +928,76 @@ public class ServerCnxTest {
         channel2.close();
     }
 
+    @Test
+    public void testHandleConsumerAfterClientChannelInactive() throws 
Exception {
+        final String tName = successTopicName;
+        final long consumerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final String sName = successSubName;
+        final String cName1 = ConsumerName.generateRandomName();
+        final String cName2 = ConsumerName.generateRandomName();
+        resetChannel();
+        setChannelConnected();
+
+        // The producer register using the first connection.
+        ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, 
consumerId, requestId.incrementAndGet(),
+                SubType.Exclusive, 0, cName1, 0);
+        channel.writeInbound(cmdSubscribe1);
+        assertTrue(getResponse() instanceof CommandSuccess);
+        PersistentTopic topicRef = (PersistentTopic) 
brokerService.getTopicReference(tName).get();
+        assertNotNull(topicRef);
+        assertNotNull(topicRef.getSubscription(sName).getConsumers());
+        assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+        
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
 cName1);
+
+        // Verify the second producer using a new connection will override the 
producer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        setChannelConnected(channel2.serverCnx);
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            channel.runPendingTasks();
+            ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, 
consumerId, requestId.incrementAndGet(),
+                    CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
+            channel2.channel.writeInbound(cmdSubscribe2);
+            assertTrue(getResponse(channel2.channel, 
channel2.clientChannelHelper) instanceof CommandSuccess);
+            
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+            
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
 cName2);
+        });
+
+        // cleanup.
+        channel.finish();
+        channel2.close();
+    }
+
+    /**
+     * When a channel typed "EmbeddedChannel", once we call 
channel.execute(runnable), there is no background thread
+     * to run it.
+     * So starting a background thread to trigger the tasks in the queue.
+     */
+    private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final 
EmbeddedChannel channel) {
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() 
-> {
+            channel.runPendingTasks();
+        }, 100, 100, TimeUnit.MILLISECONDS);
+        return new BackGroundExecutor(executor, scheduledFuture);
+    }
+
+    @AllArgsConstructor
+    private static class BackGroundExecutor implements Closeable {
+
+        private ScheduledExecutorService executor;
+
+        private ScheduledFuture scheduledFuture;
+
+        @Override
+        public void close() throws IOException {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+            executor.shutdown();
+        }
+    }
+
     private class ClientChannel implements Closeable {
         private ClientChannelHelper clientChannelHelper = new 
ClientChannelHelper();
         private ServerCnx serverCnx = new ServerCnx(pulsar);
@@ -1660,9 +1735,11 @@ public class ServerCnxTest {
                 "test" /* consumer name */, 0 /* avoid reseting cursor */);
         channel.writeInbound(clientCommand);
 
+        BackGroundExecutor backGroundExecutor = 
startBackgroundExecutorForEmbeddedChannel(channel);
+
         // Create producer second time
         clientCommand = Commands.newSubscribe(successTopicName, //
-                successSubName, 2 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
+                successSubName, 2 /* consumer id */, 2 /* request id */, 
SubType.Exclusive, 0,
                 "test" /* consumer name */, 0 /* avoid reseting cursor */);
         channel.writeInbound(clientCommand);
 
@@ -1672,6 +1749,9 @@ public class ServerCnxTest {
             CommandError error = (CommandError) response;
             assertEquals(error.getError(), ServerError.ConsumerBusy);
         });
+
+        // cleanup.
+        backGroundExecutor.close();
         channel.finish();
     }
 
@@ -2510,13 +2590,7 @@ public class ServerCnxTest {
                     if (channelsStoppedAnswerHealthCheck.contains(channel)) {
                         continue;
                     }
-                    
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
-                        if (!future.isSuccess()) {
-                            log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
-                                    channel, future.cause());
-                            channel.close();
-                        }
-                    });
+                    channel.writeInbound(Commands.newPong());
                     continue;
                 }
                 return cmd;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 779a0bdba08..6a8c95ca76a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,11 @@ package org.apache.pulsar.broker.service.utils;
 import java.util.Queue;
 import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
+import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
@@ -162,6 +167,33 @@ public class ClientChannelHelper {
                 CommandAddPartitionToTxnResponse 
commandAddPartitionToTxnResponse) {
             queue.offer(new 
CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
         }
+
+        @Override
+        protected void handleEndTxnResponse(CommandEndTxnResponse 
commandEndTxnResponse) {
+            queue.offer(new 
CommandEndTxnResponse().copyFrom(commandEndTxnResponse));
+        }
+
+        @Override
+        protected void handleEndTxnOnPartitionResponse(
+                CommandEndTxnOnPartitionResponse 
commandEndTxnOnPartitionResponse) {
+            queue.offer(new 
CommandEndTxnOnPartitionResponse().copyFrom(commandEndTxnOnPartitionResponse));
+        }
+
+        @Override
+        protected void handleEndTxnOnSubscriptionResponse(
+                CommandEndTxnOnSubscriptionResponse 
commandEndTxnOnSubscriptionResponse) {
+            queue.offer(new 
CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
+        }
+
+        @Override
+        protected void handlePing(CommandPing ping) {
+            queue.offer(new CommandPing().copyFrom(ping));
+        }
+
+        @Override
+        protected void handlePong(CommandPong pong) {
+            return;
+        }
     };
 
 }

Reply via email to