This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b796f561db4 [fix] [broker] Make the new exclusive consumer instead the
inactive one faster (#21183)
b796f561db4 is described below
commit b796f561db4e55973a8d38f1e8e3e1d92a416bf5
Author: fengyubiao <[email protected]>
AuthorDate: Mon Oct 30 18:57:49 2023 +0800
[fix] [broker] Make the new exclusive consumer instead the inactive one
faster (#21183)
### Motivation
There is an issue similar to the
https://github.com/apache/pulsar/pull/21155 fixed one.
The client assumed the connection was inactive, but the Broker assumed the
connection was fine. The Client tried to use a new connection to reconnect an
exclusive consumer, then got an error `Exclusive consumer is already connected`
### Modifications
- Check the connection of the old consumer is available when the new one
tries to subscribe
(cherry picked from commit 29db8f84e5f0f99b110d62090ab670c59bf4638a)
---
.../AbstractDispatcherSingleActiveConsumer.java | 17 ++-
.../pulsar/broker/service/PersistentTopicTest.java | 23 ++--
.../pulsar/broker/service/ServerCnxTest.java | 150 +++++++++++++++++++--
.../broker/service/utils/ClientChannelHelper.java | 12 ++
4 files changed, 180 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5098890242b..310354dcd3b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -166,7 +166,22 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
- return FutureUtil.failedFuture(new
ConsumerBusyException("Exclusive consumer is already connected"));
+ Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ if (actConsumer != null) {
+ return
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive
-> {
+ if (actConsumerStillAlive == null ||
actConsumerStillAlive) {
+ return FutureUtil.failedFuture(new
ConsumerBusyException("Exclusive consumer is already"
+ + " connected"));
+ } else {
+ return addConsumer(consumer);
+ }
+ });
+ } else {
+ // It should never happen.
+
+ return FutureUtil.failedFuture(new
ConsumerBusyException("Active consumer is in a strange state."
+ + " Active consumer is null, but there are " +
consumers.size() + " registered."));
+ }
}
if (subscriptionType == SubType.Failover &&
isConsumersExceededOnSubscription()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 22b9949b7ee..d8b81a51b30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -204,6 +204,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
doReturn(channel).when(ctx).channel();
doReturn(ctx).when(serverCnx).ctx();
+
doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness();
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
@@ -682,7 +683,15 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
f1.get();
// 2. duplicate subscribe
- Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd));
+ CommandSubscribe cmd2 = new CommandSubscribe()
+ .setConsumerId(2)
+ .setTopic(successTopicName)
+ .setSubscription(successSubName)
+ .setConsumerName("consumer-name")
+ .setReadCompacted(false)
+ .setRequestId(2)
+ .setSubType(SubType.Exclusive);
+ Future<Consumer> f2 = topic.subscribe(getSubscriptionOption(cmd2));
try {
f2.get();
fail("should fail with exception");
@@ -747,19 +756,11 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
- // 2. duplicate add consumer
- try {
- sub.addConsumer(consumer).get();
- fail("Should fail with ConsumerBusyException");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof
BrokerServiceException.ConsumerBusyException);
- }
-
- // 3. simple remove consumer
+ // 2. simple remove consumer
sub.removeConsumer(consumer);
assertFalse(sub.getDispatcher().isConsumerConnected());
- // 4. duplicate remove consumer
+ // 3. duplicate remove consumer
try {
sub.removeConsumer(consumer);
fail("Should fail with ServerMetadataException");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 5fd48819813..79178ec491f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -45,6 +45,7 @@ import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelId;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.vertx.core.impl.ConcurrentHashSet;
@@ -52,6 +53,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -62,10 +64,15 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -99,6 +106,7 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -997,7 +1005,8 @@ public class ServerCnxTest {
channelsStoppedAnswerHealthCheck.add(channel);
ClientChannel channel2 = new ClientChannel();
setChannelConnected(channel2.serverCnx);
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
+ channel.runPendingTasks();
ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
pName, false, metadata, null, epoch.incrementAndGet(),
false,
ProducerAccessMode.Shared, Optional.empty(), false);
@@ -1011,10 +1020,132 @@ public class ServerCnxTest {
channel2.close();
}
+ @Test
+ public void testHandleConsumerAfterClientChannelInactive() throws
Exception {
+ final String tName = successTopicName;
+ final long consumerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final String sName = successSubName;
+ final String cName1 = ConsumerName.generateRandomName();
+ final String cName2 = ConsumerName.generateRandomName();
+ resetChannel();
+ setChannelConnected();
+
+ // The producer register using the first connection.
+ ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName,
consumerId, requestId.incrementAndGet(),
+ SubType.Exclusive, 0, cName1, 0);
+ channel.writeInbound(cmdSubscribe1);
+ assertTrue(getResponse() instanceof CommandSuccess);
+ PersistentTopic topicRef = (PersistentTopic)
brokerService.getTopicReference(tName).get();
+ assertNotNull(topicRef);
+ assertNotNull(topicRef.getSubscription(sName).getConsumers());
+ assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
cName1);
+
+ // Verify the second producer using a new connection will override the
consumer who using a stopped channel.
+ channelsStoppedAnswerHealthCheck.add(channel);
+ ClientChannel channel2 = new ClientChannel();
+ setChannelConnected(channel2.serverCnx);
+ ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName,
consumerId, requestId.incrementAndGet(),
+ CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
+ channel2.channel.writeInbound(cmdSubscribe2);
+ BackGroundExecutor backGroundExecutor =
startBackgroundExecutorForEmbeddedChannel(channel);
+
+ assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper)
instanceof CommandSuccess);
+ assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
cName2);
+ backGroundExecutor.close();
+
+ // cleanup.
+ channel.finish();
+ channel2.close();
+ }
+
+ @Test
+ public void test2ndSubFailedIfDisabledConCheck()
+ throws Exception {
+ final String tName = successTopicName;
+ final long consumerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final String sName = successSubName;
+ final String cName1 = ConsumerName.generateRandomName();
+ final String cName2 = ConsumerName.generateRandomName();
+ // Disabled connection check.
+ pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1);
+ resetChannel();
+ setChannelConnected();
+
+ // The consumer register using the first connection.
+ ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName,
consumerId, requestId.incrementAndGet(),
+ SubType.Exclusive, 0, cName1, 0);
+ channel.writeInbound(cmdSubscribe1);
+ assertTrue(getResponse() instanceof CommandSuccess);
+ PersistentTopic topicRef = (PersistentTopic)
brokerService.getTopicReference(tName).orElse(null);
+ assertNotNull(topicRef);
+ assertNotNull(topicRef.getSubscription(sName).getConsumers());
+
assertEquals(topicRef.getSubscription(sName).getConsumers().stream().map(Consumer::consumerName)
+ .collect(Collectors.toList()),
Collections.singletonList(cName1));
+
+ // Verify the consumer using a new connection will override the
consumer who using a stopped channel.
+ channelsStoppedAnswerHealthCheck.add(channel);
+ ClientChannel channel2 = new ClientChannel();
+ setChannelConnected(channel2.serverCnx);
+ ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName,
consumerId, requestId.incrementAndGet(),
+ CommandSubscribe.SubType.Exclusive, 0, cName2, 0);
+ channel2.channel.writeInbound(cmdSubscribe2);
+ BackGroundExecutor backGroundExecutor =
startBackgroundExecutorForEmbeddedChannel(channel);
+
+ // Since the feature "ConnectionLiveness" has been disabled, the fix
+ // by https://github.com/apache/pulsar/pull/21183 will not be
affected, so the client will still get an error.
+ Object responseOfConnection2 = getResponse(channel2.channel,
channel2.clientChannelHelper);
+ assertTrue(responseOfConnection2 instanceof CommandError);
+ assertTrue(((CommandError) responseOfConnection2).getMessage()
+ .contains("Exclusive consumer is already connected"));
+ assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1);
+
assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(),
cName1);
+ backGroundExecutor.close();
+
+ // cleanup.
+ channel.finish();
+ channel2.close();
+ // Reset configuration.
+ pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(5000);
+ }
+
+ /**
+ * When a channel typed "EmbeddedChannel", once we call
channel.execute(runnable), there is no background thread
+ * to run it.
+ * So starting a background thread to trigger the tasks in the queue.
+ */
+ private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final
EmbeddedChannel channel) {
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(()
-> {
+ channel.runPendingTasks();
+ }, 100, 100, TimeUnit.MILLISECONDS);
+ return new BackGroundExecutor(executor, scheduledFuture);
+ }
+
+ @AllArgsConstructor
+ private static class BackGroundExecutor implements Closeable {
+
+ private ScheduledExecutorService executor;
+
+ private ScheduledFuture scheduledFuture;
+
+ @Override
+ public void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+ executor.shutdown();
+ }
+ }
+
private class ClientChannel implements Closeable {
private ClientChannelHelper clientChannelHelper = new
ClientChannelHelper();
private ServerCnx serverCnx = new ServerCnx(pulsar);
- private EmbeddedChannel channel = new EmbeddedChannel(new
LengthFieldBasedFrameDecoder(
+ private EmbeddedChannel channel = new
EmbeddedChannel(DefaultChannelId.newInstance(),
+ new LengthFieldBasedFrameDecoder(
5 * 1024 * 1024,
0,
4,
@@ -1810,9 +1941,11 @@ public class ServerCnxTest {
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
+ BackGroundExecutor backGroundExecutor =
startBackgroundExecutorForEmbeddedChannel(channel);
+
// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
- successSubName, 2 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
+ successSubName, 2 /* consumer id */, 2 /* request id */,
SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
@@ -1822,6 +1955,9 @@ public class ServerCnxTest {
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ConsumerBusy);
});
+
+ // cleanup.
+ backGroundExecutor.close();
channel.finish();
}
@@ -2676,13 +2812,7 @@ public class ServerCnxTest {
if (channelsStoppedAnswerHealthCheck.contains(channel)) {
continue;
}
-
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
- if (!future.isSuccess()) {
- log.warn("[{}] Forcing connection to close since
cannot send a pong message.",
- channel, future.cause());
- channel.close();
- }
- });
+ channel.writeInbound(Commands.newPong());
continue;
}
return cmd;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index bf0dd3aa9c1..c8fce32efc5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -27,6 +27,8 @@ import
org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
+import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
@@ -207,6 +209,16 @@ public class ClientChannelHelper {
CommandEndTxnOnSubscriptionResponse
commandEndTxnOnSubscriptionResponse) {
queue.offer(new
CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
}
+
+ @Override
+ protected void handlePing(CommandPing ping) {
+ queue.offer(new CommandPing().copyFrom(ping));
+ }
+
+ @Override
+ protected void handlePong(CommandPong pong) {
+ return;
+ }
};
}