This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 9c45082224f [fix][broker]A failed consumer/producer future in
ServerCnx can never be removed (#23123)
9c45082224f is described below
commit 9c45082224fb3974aecf5f9aae4d61b4ccdf557f
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 6 10:16:33 2024 +0800
[fix][broker]A failed consumer/producer future in ServerCnx can never be
removed (#23123)
(cherry picked from commit 114880b1428ac1f6bbd97c43a26d4fa313a87b96)
---
.../apache/pulsar/broker/service/ServerCnx.java | 24 ++++++++-
.../broker/service/ServerCnxNonInjectionTest.java | 62 ++++++++++++++++++++++
.../client/impl/BrokerClientIntegrationTest.java | 50 ++++++++++-------
3 files changed, 115 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b7cd8e4eab6..e846d144d58 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3604,8 +3604,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
public CompletableFuture<Optional<Boolean>> checkConnectionLiveness() {
+ if (!isActive()) {
+ return CompletableFuture.completedFuture(Optional.of(false));
+ }
if (connectionLivenessCheckTimeoutMillis > 0) {
return
NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
+ if (!isActive()) {
+ return
CompletableFuture.completedFuture(Optional.of(false));
+ }
if (connectionCheckInProgress != null) {
return connectionCheckInProgress;
} else {
@@ -3613,10 +3619,24 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
new CompletableFuture<>();
connectionCheckInProgress = finalConnectionCheckInProgress;
ctx.executor().schedule(() -> {
- if (finalConnectionCheckInProgress ==
connectionCheckInProgress
- && !finalConnectionCheckInProgress.isDone()) {
+ if (!isActive()) {
+
finalConnectionCheckInProgress.complete(Optional.of(false));
+ return;
+ }
+ if (finalConnectionCheckInProgress.isDone()) {
+ return;
+ }
+ if (finalConnectionCheckInProgress ==
connectionCheckInProgress) {
+ /**
+ * {@link #connectionCheckInProgress} will be
completed when
+ * {@link #channelInactive(ChannelHandlerContext)}
event occurs, so skip set it here.
+ */
log.warn("[{}] Connection check timed out. Closing
connection.", this.toString());
ctx.close();
+ } else {
+ log.error("[{}] Reached unexpected code block.
Completing connection check.",
+ this.toString());
+
finalConnectionCheckInProgress.complete(Optional.of(true));
}
}, connectionLivenessCheckTimeoutMillis,
TimeUnit.MILLISECONDS);
sendPing();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
new file mode 100644
index 00000000000..3acc941a2c8
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class ServerCnxNonInjectionTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void testCheckConnectionLivenessAfterClosed() throws Exception {
+ // Create a ServerCnx
+ final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
+ Producer<String> p =
pulsarClient.newProducer(Schema.STRING).topic(tp).create();
+ ServerCnx serverCnx = (ServerCnx)
pulsar.getBrokerService().getTopic(tp, false).join().get()
+ .getProducers().values().iterator().next().getCnx();
+ // Call "CheckConnectionLiveness" after serverCnx is closed. The
resulted future should be done eventually.
+ p.close();
+ serverCnx.close();
+ Thread.sleep(1000);
+ serverCnx.checkConnectionLiveness().join();
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c2715de986a..06c6069ebae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -67,11 +67,11 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.resources.BaseResources;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
-import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1008,28 +1008,36 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
int numMessages = 100;
final CountDownLatch latch = new CountDownLatch(numMessages);
- String topic = "persistent://my-property/my-ns/closed-cnx-topic";
+ String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/closed-cnx-topic");
+ admin.topics().createNonPartitionedTopic(topic);
String sub = "my-subscriber-name";
@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
-
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
- Assert.assertNotNull(msg, "Message cannot be null");
- String receivedMessage = new String(msg.getData());
- log.debug("Received message [{}] in the listener",
receivedMessage);
- c1.acknowledgeAsync(msg);
- latch.countDown();
- }).subscribe();
-
+ ConsumerImpl c =
+ (ConsumerImpl)
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1,
msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener",
receivedMessage);
+ c1.acknowledgeAsync(msg);
+ latch.countDown();
+ }).subscribe();
PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
-
AbstractDispatcherSingleActiveConsumer dispatcher =
(AbstractDispatcherSingleActiveConsumer) topicRef
.getSubscription(sub).getDispatcher();
- ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
- Field field = ServerCnx.class.getDeclaredField("isActive");
- field.setAccessible(true);
- field.set(cnx, false);
-
assertNotNull(dispatcher.getActiveConsumer());
+
+ // Inject an blocker to make the "ping & pong" does not work.
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ ConnectionHandler connectionHandler = c.getConnectionHandler();
+ ClientCnx clientCnx = connectionHandler.cnx();
+ clientCnx.ctx().executor().submit(() -> {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
@Cleanup
PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer = null;
@@ -1042,15 +1050,19 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
c1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();
- if (i == 0) {
- fail("Should failed with ConsumerBusyException!");
- }
} catch (PulsarClientException.ConsumerBusyException ignore) {
// It's ok.
}
}
assertNotNull(consumer);
log.info("-- Exiting {} test --", methodName);
+
+ // cleanup.
+ countDownLatch.countDown();
+ consumer.close();
+ pulsarClient.close();
+ pulsarClient2.close();
+ admin.topics().delete(topic, false);
}
@Test