This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 7cb32a2e3ef [fix] [broker] Make the new exclusive consumer instead the
inactive one faster (#21183)
7cb32a2e3ef is described below
commit 7cb32a2e3ef9976693cc35663261e163bf47d624
Author: fengyubiao <[email protected]>
AuthorDate: Mon Oct 30 18:57:49 2023 +0800
[fix] [broker] Make the new exclusive consumer instead the inactive one
faster (#21183)
There is an issue similar to the
https://github.com/apache/pulsar/pull/21155 fixed one.
The client assumed the connection was inactive, but the Broker assumed the
connection was fine. The Client tried to use a new connection to reconnect an
exclusive consumer, then got an error `Exclusive consumer is already connected`
- Check the connection of the old consumer is available when the new one
tries to subscribe
(cherry picked from commit 29db8f84e5f0f99b110d62090ab670c59bf4638a)
---
.../AbstractDispatcherSingleActiveConsumer.java | 4 +
.../pulsar/broker/service/PersistentTopicTest.java | 11 ++-
.../pulsar/broker/service/ServerCnxTest.java | 90 ++++++++++++++++++++--
.../broker/service/utils/ClientChannelHelper.java | 32 ++++++++
4 files changed, 128 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 17a6d1dbfb3..9b4b250b577 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -144,6 +144,10 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
+ Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ if (actConsumer != null) {
+ actConsumer.cnx().checkConnectionLiveness();
+ }
throw new ConsumerBusyException("Exclusive consumer is already
connected");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 67ef1bbfef7..0e7a26f0cf9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -228,6 +228,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
doReturn(channel).when(ctx).channel();
doReturn(ctx).when(serverCnx).ctx();
+
doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness();
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
@@ -741,7 +742,15 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
f1.get();
// 2. duplicate subscribe
- Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
+ CommandSubscribe cmd2 = new CommandSubscribe()
+ .setConsumerId(2)
+ .setTopic(successTopicName)
+ .setSubscription(successSubName)
+ .setConsumerName("consumer-name")
+ .setReadCompacted(false)
+ .setRequestId(2)
+ .setSubType(SubType.Exclusive);
+ Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd2));
try {
f2.get();
fail("should fail with exception");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5457a6088f2..d20ae4798f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -60,11 +60,15 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -102,6 +106,7 @@ import
org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -923,6 +928,76 @@ public class ServerCnxTest {
channel2.close();
}
+ @Test
+ public void testHandleConsumerAfterClientChannelInactive() throws
Exception {
+ final String tName = successTopicName;
+ final long consumerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final String sName = successSubName;
+ final String cName1 = ConsumerName.generateRandomName();
+ final String cName2 = ConsumerName.generateRandomName();
+ resetChannel();
+ setChannelConnected();
+
+ // The producer register using the first connection.
+ ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName,
consumerId, requestId.incrementAndGet(),
+ SubType.Exclusive, 0, cName1, 0);
+ channel.writeInbound(cmdSubscribe1);
+ assertTrue(getResponse() instanceof CommandSuccess);
+ PersistentTopic topicRef = (PersistentTopic)
brokerService.getTopicReference(tName).get();
+ assertNotNull(topicRef);
+ assertNotNull(topicRef.getSubscription(sName).getConsumers());
+ assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
cName1);
+
+ // Verify the second producer using a new connection will override the
producer who using a stopped channel.
+ channelsStoppedAnswerHealthCheck.add(channel);
+ ClientChannel channel2 = new ClientChannel();
+ setChannelConnected(channel2.serverCnx);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ channel.runPendingTasks();
+ ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName,
consumerId, requestId.incrementAndGet(),
+ CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
+ channel2.channel.writeInbound(cmdSubscribe2);
+ assertTrue(getResponse(channel2.channel,
channel2.clientChannelHelper) instanceof CommandSuccess);
+
assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
cName2);
+ });
+
+ // cleanup.
+ channel.finish();
+ channel2.close();
+ }
+
+ /**
+ * When a channel typed "EmbeddedChannel", once we call
channel.execute(runnable), there is no background thread
+ * to run it.
+ * So starting a background thread to trigger the tasks in the queue.
+ */
+ private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final
EmbeddedChannel channel) {
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(()
-> {
+ channel.runPendingTasks();
+ }, 100, 100, TimeUnit.MILLISECONDS);
+ return new BackGroundExecutor(executor, scheduledFuture);
+ }
+
+ @AllArgsConstructor
+ private static class BackGroundExecutor implements Closeable {
+
+ private ScheduledExecutorService executor;
+
+ private ScheduledFuture scheduledFuture;
+
+ @Override
+ public void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+ executor.shutdown();
+ }
+ }
+
private class ClientChannel implements Closeable {
private ClientChannelHelper clientChannelHelper = new
ClientChannelHelper();
private ServerCnx serverCnx = new ServerCnx(pulsar);
@@ -1660,9 +1735,11 @@ public class ServerCnxTest {
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
+ BackGroundExecutor backGroundExecutor =
startBackgroundExecutorForEmbeddedChannel(channel);
+
// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
- successSubName, 2 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
+ successSubName, 2 /* consumer id */, 2 /* request id */,
SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
@@ -1672,6 +1749,9 @@ public class ServerCnxTest {
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ConsumerBusy);
});
+
+ // cleanup.
+ backGroundExecutor.close();
channel.finish();
}
@@ -2510,13 +2590,7 @@ public class ServerCnxTest {
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
-
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
- if (!future.isSuccess()) {
- log.warn("[{}] Forcing connection to close since
cannot send a pong message.",
- channel, future.cause());
- channel.close();
- }
- });
+ channel.writeInbound(Commands.newPong());
continue;
}
return cmd;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 779a0bdba08..6a8c95ca76a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -21,6 +21,11 @@ package org.apache.pulsar.broker.service.utils;
import java.util.Queue;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
+import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
@@ -162,6 +167,33 @@ public class ClientChannelHelper {
CommandAddPartitionToTxnResponse
commandAddPartitionToTxnResponse) {
queue.offer(new
CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
}
+
+ @Override
+ protected void handleEndTxnResponse(CommandEndTxnResponse
commandEndTxnResponse) {
+ queue.offer(new
CommandEndTxnResponse().copyFrom(commandEndTxnResponse));
+ }
+
+ @Override
+ protected void handleEndTxnOnPartitionResponse(
+ CommandEndTxnOnPartitionResponse
commandEndTxnOnPartitionResponse) {
+ queue.offer(new
CommandEndTxnOnPartitionResponse().copyFrom(commandEndTxnOnPartitionResponse));
+ }
+
+ @Override
+ protected void handleEndTxnOnSubscriptionResponse(
+ CommandEndTxnOnSubscriptionResponse
commandEndTxnOnSubscriptionResponse) {
+ queue.offer(new
CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
+ }
+
+ @Override
+ protected void handlePing(CommandPing ping) {
+ queue.offer(new CommandPing().copyFrom(ping));
+ }
+
+ @Override
+ protected void handlePong(CommandPong pong) {
+ return;
+ }
};
}