This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new fbf87dfaf91 [fix] [broker] Make specified producer could override the
previous one (#21155)
fbf87dfaf91 is described below
commit fbf87dfaf91951b394bb93715a3fc3ac105c7905
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 14 00:17:10 2023 +0800
[fix] [broker] Make specified producer could override the previous one
(#21155)
The client assumed the connection was inactive, but the Broker assumed the
connection was fine. The Client tried to use a new connection to reconnect a
producer, then got an error `Producer with name 'st-0-5' is already connected
to topic`.
- In a connection, the second connection waits for the first connection to
complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override
the first one. But it can not override the first one if it uses a specified
producer name\. I think this mechanism is to prevent a client from creating two
producers with the same name. However, method `Producer.isSuccessorTo` has
checked the `producer-id`, and the `producer-id` of multiple producers created
by the same client are different. So this mechanism can be deleted.
- For `issue 1`: If a producer with the same name tries to use a new
connection, async checks the old connection is available. The producers related
to the connection that is not available are automatically cleaned up.
- For `issue 2`:
- Fix the bug that causes a complete producer future will be removed
from `ServerCnx`.
- Remove the mechanism that prevents a producer with a specified name
from overriding the previous producer.
(cherry picked from commit bda16b6f5b715942f7ed996052f6cbd8026fbbf0)
---
.../pulsar/broker/service/AbstractTopic.java | 8 +-
.../apache/pulsar/broker/service/ServerCnx.java | 84 +++++++---
.../apache/pulsar/broker/service/TransportCnx.java | 9 ++
.../broker/auth/MockedPulsarServiceBaseTest.java | 24 +++
.../pulsar/broker/service/ServerCnxTest.java | 177 +++++++++++++++++++--
.../client/impl/ProducerConsumerInternalTest.java | 147 +++++++++++++++++
.../pulsar/common/protocol/PulsarHandler.java | 23 +--
7 files changed, 433 insertions(+), 39 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 76e1df0d93d..e5950b35817 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -934,8 +934,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
private void tryOverwriteOldProducer(Producer oldProducer, Producer
newProducer)
throws BrokerServiceException {
- if (newProducer.isSuccessorTo(oldProducer) &&
!isUserProvidedProducerName(oldProducer)
- && !isUserProvidedProducerName(newProducer)) {
+ if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer,
newProducer)) {
// Met concurrent update, throw exception here so that client
can try reconnect later.
@@ -945,6 +944,11 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
handleProducerRemoved(oldProducer);
}
} else {
+ // If a producer with the same name tries to use a new connection,
async check the old connection is
+ // available. The producers related the connection that not
available are automatically cleaned up.
+ if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
+ oldProducer.getCnx().checkConnectionLiveness();
+ }
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() +
"' is already connected to topic");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ef7cfdf14d8..fe43a0a147b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -154,6 +154,7 @@ import
org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
@@ -216,6 +217,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private final long maxPendingBytesPerThread;
private final long resumeThresholdPendingBytesPerThread;
+ private final long connectionLivenessCheckTimeoutMillis = 5000;
+
// Number of bytes pending to be published from a single specific IO
thread.
private static final FastThreadLocal<MutableLong> pendingBytesPerThread =
new FastThreadLocal<MutableLong>() {
@Override
@@ -343,6 +346,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
});
this.service.getPulsarStats().recordConnectionClose();
+
+ // complete possible pending connection check future
+ if (connectionCheckInProgress != null &&
!connectionCheckInProgress.isDone()) {
+ connectionCheckInProgress.complete(false);
+ }
}
@Override
@@ -1222,34 +1230,35 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Producer> existingProducerFuture =
producers.putIfAbsent(producerId, producerFuture);
if (existingProducerFuture != null) {
- if (existingProducerFuture.isDone() &&
!existingProducerFuture.isCompletedExceptionally()) {
- Producer producer = existingProducerFuture.getNow(null);
- log.info("[{}] Producer with the same id is already
created:"
- + " producerId={}, producer={}", remoteAddress,
producerId, producer);
- commandSender.sendProducerSuccessResponse(requestId,
producer.getProducerName(),
- producer.getSchemaVersion());
- return null;
- } else {
+ if (!existingProducerFuture.isDone()) {
// There was an early request to create a producer with
same producerId.
// This can happen when client timeout is lower than the
broker timeouts.
// We need to wait until the previous producer creation
request
// either complete or fails.
- ServerError error = null;
- if (!existingProducerFuture.isDone()) {
- error = ServerError.ServiceNotReady;
- } else {
- error = getErrorCode(existingProducerFuture);
- // remove producer with producerId as it's already
completed with exception
- producers.remove(producerId, existingProducerFuture);
- }
log.warn("[{}][{}] Producer with id is already present on
the connection, producerId={}",
remoteAddress, topicName, producerId);
- commandSender.sendErrorResponse(requestId, error,
"Producer is already present on the connection");
- return null;
+ commandSender.sendErrorResponse(requestId,
ServerError.ServiceNotReady,
+ "Producer is already present on the connection");
+ } else if (existingProducerFuture.isCompletedExceptionally()) {
+ // remove producer with producerId as it's already
completed with exception
+ log.warn("[{}][{}] Producer with id is failed to register
present on the connection, producerId={}",
+ remoteAddress, topicName, producerId);
+ ServerError error = getErrorCode(existingProducerFuture);
+ producers.remove(producerId, existingProducerFuture);
+ commandSender.sendErrorResponse(requestId, error,
+ "Producer is already failed to register present on
the connection");
+ } else {
+ Producer producer = existingProducerFuture.getNow(null);
+ log.info("[{}] [{}] Producer with the same id is already
created:"
+ + " producerId={}, producer={}", remoteAddress,
topicName, producerId, producer);
+ commandSender.sendProducerSuccessResponse(requestId,
producer.getProducerName(),
+ producer.getSchemaVersion());
}
+ return null;
}
- log.info("[{}][{}] Creating producer. producerId={}",
remoteAddress, topicName, producerId);
+ log.info("[{}][{}] Creating producer. producerId={},
producerName={}, schema is {}", remoteAddress,
+ topicName, producerId, producerName, schema == null ?
"absent" : "present");
service.getOrCreateTopic(topicName.toString()).thenCompose((Topic
topic) -> {
// Before creating producer, check if backlog quota exceeded
@@ -3072,6 +3081,43 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}
+ CompletableFuture<Boolean> connectionCheckInProgress;
+
+ @Override
+ public CompletableFuture<Boolean> checkConnectionLiveness() {
+ if (connectionLivenessCheckTimeoutMillis > 0) {
+ return
NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
+ if (connectionCheckInProgress != null) {
+ return connectionCheckInProgress;
+ } else {
+ final CompletableFuture<Boolean>
finalConnectionCheckInProgress = new CompletableFuture<>();
+ connectionCheckInProgress = finalConnectionCheckInProgress;
+ ctx.executor().schedule(() -> {
+ if (finalConnectionCheckInProgress ==
connectionCheckInProgress
+ && !finalConnectionCheckInProgress.isDone()) {
+ log.warn("[{}] Connection check timed out. Closing
connection.", remoteAddress);
+ ctx.close();
+ }
+ }, connectionLivenessCheckTimeoutMillis,
TimeUnit.MILLISECONDS);
+ sendPing();
+ return finalConnectionCheckInProgress;
+ }
+ })).thenCompose(java.util.function.Function.identity());
+ } else {
+ // check is disabled
+ return CompletableFuture.completedFuture((Boolean) null);
+ }
+ }
+
+ @Override
+ protected void messageReceived() {
+ super.messageReceived();
+ if (connectionCheckInProgress != null &&
!connectionCheckInProgress.isDone()) {
+ connectionCheckInProgress.complete(true);
+ connectionCheckInProgress = null;
+ }
+ }
+
private static void logAuthException(SocketAddress remoteAddress, String
operation,
String principal, Optional<TopicName>
topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" +
t.toString()).orElse("");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index cc0c559eaf2..f28016c2de9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
public interface TransportCnx {
@@ -78,4 +79,12 @@ public interface TransportCnx {
String clientSourceAddress();
+ /***
+ * Check if the connection is still alive
+ * by actively sending a Ping message to the client.
+ *
+ * @return a completable future where the result is true if the connection
is alive, false otherwise. The result
+ * is null if the connection liveness check is disabled.
+ */
+ CompletableFuture<Boolean> checkConnectionLiveness();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c1f75ba141d..d88a411be34 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
@@ -49,17 +50,21 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BookKeeperClientFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -71,6 +76,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.DataProvider;
@@ -563,5 +569,23 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
});
}
+ protected ServiceProducer getServiceProducer(ProducerImpl clientProducer,
String topicName) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ org.apache.pulsar.broker.service.Producer serviceProducer =
+
persistentTopic.getProducers().get(clientProducer.getProducerName());
+ long clientProducerId = WhiteboxImpl.getInternalState(clientProducer,
"producerId");
+ assertEquals(serviceProducer.getProducerId(), clientProducerId);
+ assertEquals(serviceProducer.getEpoch(),
clientProducer.getConnectionHandler().getEpoch());
+ return new ServiceProducer(serviceProducer, persistentTopic);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public static class ServiceProducer {
+ private org.apache.pulsar.broker.service.Producer serviceProducer;
+ private PersistentTopic persistentTopic;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e2af5944ef0..5457a6088f2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -44,10 +43,13 @@ import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelId;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.vertx.core.impl.ConcurrentHashSet;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -63,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
@@ -96,6 +100,7 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
@@ -109,6 +114,7 @@ import
org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -127,6 +133,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.protocol.schema.EmptyVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -142,6 +149,7 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+@Slf4j
@SuppressWarnings("unchecked")
@Test(groups = "broker")
public class ServerCnxTest {
@@ -171,25 +179,24 @@ public class ServerCnxTest {
private ManagedCursor cursorMock;
private OrderedExecutor executor;
private EventLoopGroup eventLoopGroup;
+ private ConcurrentHashSet<EmbeddedChannel>
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
+ channelsStoppedAnswerHealthCheck.clear();
eventLoopGroup = new NioEventLoopGroup();
executor = OrderedExecutor.newBuilder().numThreads(1).build();
- svcConfig = spy(ServiceConfiguration.class);
+ svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- svcConfig.setClusterName("pulsar-cluster");
- pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
-
svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
svcConfig.setBacklogQuotaCheckEnabled(false);
+ svcConfig.setClusterName("use");
+ pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
+ doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
doReturn(svcConfig).when(pulsar).getConfiguration();
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
- doReturn("use").when(svcConfig).getClusterName();
-
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
@@ -876,6 +883,136 @@ public class ServerCnxTest {
}));
}
+ @Test
+ public void testHandleProducerAfterClientChannelInactive() throws
Exception {
+ final String tName = successTopicName;
+ final long producerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final MutableInt epoch = new MutableInt(1);
+ final Map<String, String> metadata = Collections.emptyMap();
+ final String pName = "p1";
+ resetChannel();
+ setChannelConnected();
+
+ // The producer register using the first connection.
+ ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ channel.writeInbound(cmdProducer1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+ PersistentTopic topicRef = (PersistentTopic)
brokerService.getTopicReference(tName).get();
+ assertNotNull(topicRef);
+ assertEquals(topicRef.getProducers().size(), 1);
+
+ // Verify the second producer using a new connection will override the
producer who using a stopped channel.
+ channelsStoppedAnswerHealthCheck.add(channel);
+ ClientChannel channel2 = new ClientChannel();
+ setChannelConnected(channel2.serverCnx);
+ Awaitility.await().untilAsserted(() -> {
+ channel.runPendingTasks();
+ ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(),
false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ channel2.channel.writeInbound(cmdProducer2);
+ assertTrue(getResponse(channel2.channel,
channel2.clientChannelHelper) instanceof CommandProducerSuccess);
+ assertEquals(topicRef.getProducers().size(), 1);
+ });
+
+ // cleanup.
+ channel.finish();
+ channel2.close();
+ }
+
+ private class ClientChannel implements Closeable {
+ private ClientChannelHelper clientChannelHelper = new
ClientChannelHelper();
+ private ServerCnx serverCnx = new ServerCnx(pulsar);
+ private EmbeddedChannel channel = new
EmbeddedChannel(DefaultChannelId.newInstance(),
+ new LengthFieldBasedFrameDecoder(
+ 5 * 1024 * 1024,
+ 0,
+ 4,
+ 0,
+ 4),
+ serverCnx);
+ public ClientChannel() {
+ serverCnx.setAuthRole("");
+ }
+ public void close(){
+ if (channel != null && channel.isActive()) {
+ serverCnx.close();
+ channel.close();
+ }
+ }
+ }
+
+ @Test
+ public void testHandleProducer() throws Exception {
+ final String tName = "persistent://public/default/test-topic";
+ final long producerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final MutableInt epoch = new MutableInt(1);
+ final Map<String, String> metadata = Collections.emptyMap();
+ final String pName = "p1";
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // connect.
+ ByteBuf cConnect = Commands.newConnect("none", "", null);
+ channel.writeInbound(cConnect);
+ assertEquals(serverCnx.getState(), State.Connected);
+ assertTrue(getResponse() instanceof CommandConnected);
+
+ // There is an in-progress producer registration.
+ ByteBuf cProducer1 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ CompletableFuture existingFuture1 = new CompletableFuture();
+ serverCnx.getProducers().put(producerId, existingFuture1);
+ channel.writeInbound(cProducer1);
+ Object response1 = getResponse();
+ assertTrue(response1 instanceof CommandError);
+ CommandError error1 = (CommandError) response1;
+ assertEquals(error1.getError().toString(),
ServerError.ServiceNotReady.toString());
+ assertTrue(error1.getMessage().contains("already present on the
connection"));
+
+ // There is a failed registration.
+ ByteBuf cProducer2 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ CompletableFuture existingFuture2 = new CompletableFuture();
+ existingFuture2.completeExceptionally(new
BrokerServiceException.ProducerBusyException("123"));
+ serverCnx.getProducers().put(producerId, existingFuture2);
+
+ channel.writeInbound(cProducer2);
+ Object response2 = getResponse();
+ assertTrue(response2 instanceof CommandError);
+ CommandError error2 = (CommandError) response2;
+ assertEquals(error2.getError().toString(),
ServerError.ProducerBusy.toString());
+ assertTrue(error2.getMessage().contains("already failed to register
present on the connection"));
+
+ // There is an successful registration.
+ ByteBuf cProducer3 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ CompletableFuture existingFuture3 = new CompletableFuture();
+ org.apache.pulsar.broker.service.Producer serviceProducer =
+ mock(org.apache.pulsar.broker.service.Producer.class);
+ when(serviceProducer.getProducerName()).thenReturn(pName);
+ when(serviceProducer.getSchemaVersion()).thenReturn(new
EmptyVersion());
+ existingFuture3.complete(serviceProducer);
+ serverCnx.getProducers().put(producerId, existingFuture3);
+
+ channel.writeInbound(cProducer3);
+ Object response3 = getResponse();
+ assertTrue(response3 instanceof CommandProducerSuccess);
+ CommandProducerSuccess cProducerSuccess = (CommandProducerSuccess)
response3;
+ assertEquals(cProducerSuccess.getProducerName(), pName);
+
+ // cleanup.
+ channel.finish();
+ }
+
// This test used to be in the ServerCnxAuthorizationTest class, but it
was migrated here because the mocking
// in that class was too extensive. There is some overlap with this test
and other tests in this class. The primary
// role of this test is verifying that the correct role and
AuthenticationDataSource are passed to the
@@ -2341,6 +2478,10 @@ public class ServerCnxTest {
}
protected void setChannelConnected() throws Exception {
+ setChannelConnected(serverCnx);
+ }
+
+ protected void setChannelConnected(ServerCnx serverCnx) throws Exception {
Field channelState = ServerCnx.class.getDeclaredField("state");
channelState.setAccessible(true);
channelState.set(serverCnx, State.Connected);
@@ -2354,13 +2495,31 @@ public class ServerCnxTest {
}
protected Object getResponse() throws Exception {
+ return getResponse(channel, clientChannelHelper);
+ }
+
+ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper
clientChannelHelper) throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
for (int i = 0; i < iterations; i++) {
if (!channel.outboundMessages().isEmpty()) {
Object outObject = channel.outboundMessages().remove();
- return clientChannelHelper.getCommand(outObject);
+ Object cmd = clientChannelHelper.getCommand(outObject);
+ if (cmd instanceof CommandPing) {
+ if (channelsStoppedAnswerHealthCheck.contains(channel)) {
+ continue;
+ }
+
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
+ if (!future.isSuccess()) {
+ log.warn("[{}] Forcing connection to close since
cannot send a pong message.",
+ channel, future.cause());
+ channel.close();
+ }
+ });
+ continue;
+ }
+ return cmd;
} else {
Thread.sleep(sleepTimeMs);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
new file mode 100644
index 00000000000..56b4609d66a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.concurrent.CountDownLatch;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Different with {@link
org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit
the variables
+ * of {@link ConsumerImpl} which are modified `protected`.
+ */
+@Test(groups = "broker-api")
+public class ProducerConsumerInternalTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSameProducerRegisterTwice() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // Create producer using default producerName.
+ ProducerImpl producer = (ProducerImpl)
pulsarClient.newProducer().topic(topicName).create();
+ ServiceProducer serviceProducer = getServiceProducer(producer,
topicName);
+
+ // Remove producer maintained by server cnx. To make it can register
the second time.
+ removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+ // Trigger the client producer reconnect.
+ CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+ commandCloseProducer.setProducerId(producer.producerId);
+ producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+ // Verify the reconnection will be success.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getState().toString(), "Ready");
+ });
+ }
+
+ @Test
+ public void testSameProducerRegisterTwiceWithSpecifiedProducerName()
throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String pName = "p1";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // Create producer using default producerName.
+ ProducerImpl producer = (ProducerImpl)
pulsarClient.newProducer().producerName(pName).topic(topicName).create();
+ ServiceProducer serviceProducer = getServiceProducer(producer,
topicName);
+
+ // Remove producer maintained by server cnx. To make it can register
the second time.
+ removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+ // Trigger the client producer reconnect.
+ CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+ commandCloseProducer.setProducerId(producer.producerId);
+ producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+ // Verify the reconnection will be success.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getState().toString(), "Ready", "The
producer registration failed");
+ });
+ }
+
+ private void removeServiceProducerMaintainedByServerCnx(ServiceProducer
serviceProducer) {
+ ServerCnx serverCnx = (ServerCnx)
serviceProducer.getServiceProducer().getCnx();
+ serverCnx.removedProducer(serviceProducer.getServiceProducer());
+ Awaitility.await().untilAsserted(() -> {
+
assertFalse(serverCnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
+ });
+ }
+
+ @Test
+ public void
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "subscription1";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ final ConsumerImpl consumer = (ConsumerImpl)
pulsarClient.newConsumer().topic(topicName.toString())
+
.subscriptionType(SubscriptionType.Exclusive).subscriptionName(subscriptionName).subscribe();
+
+ ClientCnx clientCnx = consumer.getClientCnx();
+ ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService()
+
.getTopic(topicName,false).join().get().getSubscription(subscriptionName)
+ .getDispatcher().getConsumers().get(0).cnx();
+
+ // Make a disconnect to trigger broker remove the consumer which
related this connection.
+ // Make the second subscribe runs after the broker removing the old
consumer, then it will receive
+ // an error: "Exclusive consumer is already connected"
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ serverCnx.execute(() -> {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ clientCnx.close();
+ Thread.sleep(1000);
+ countDownLatch.countDown();
+
+ // Verify the consumer will always retry subscribe event received
ConsumerBusy error.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(consumer.getState(), HandlerState.State.Ready);
+ });
+
+ // cleanup.
+ consumer.close();
+ admin.topics().delete(topicName, false);
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 3cd91809f52..8cab3742bac 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.protocol;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
@@ -53,7 +54,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
}
@Override
- protected final void messageReceived() {
+ protected void messageReceived() {
waitingForPingResponse = false;
}
@@ -117,14 +118,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
log.debug("[{}] Sending ping message", ctx.channel());
}
waitingForPingResponse = true;
- ctx.writeAndFlush(Commands.newPing())
- .addListener(future -> {
- if (!future.isSuccess()) {
- log.warn("[{}] Forcing connection to close since
cannot send a ping message.",
- ctx.channel(), future.cause());
- ctx.close();
- }
- });
+ sendPing();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Peer doesn't support keep-alive",
ctx.channel());
@@ -132,6 +126,17 @@ public abstract class PulsarHandler extends PulsarDecoder {
}
}
+ protected ChannelFuture sendPing() {
+ return ctx.writeAndFlush(Commands.newPing())
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ log.warn("[{}] Forcing connection to close since
cannot send a ping message.",
+ ctx.channel(), future.cause());
+ ctx.close();
+ }
+ });
+ }
+
public void cancelKeepAliveTask() {
if (keepAliveTask != null) {
keepAliveTask.cancel(false);