This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bda16b6f5b7 [fix] [broker] Make specified producer could override the
previous one (#21155)
bda16b6f5b7 is described below
commit bda16b6f5b715942f7ed996052f6cbd8026fbbf0
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 14 00:17:10 2023 +0800
[fix] [broker] Make specified producer could override the previous one
(#21155)
#### Issue 1
The client assumed the connection was inactive, but the Broker assumed the
connection was fine. The Client tried to use a new connection to reconnect a
producer, then got an error `Producer with name 'st-0-5' is already connected
to topic`.
#### Issue 2
- In a connection, the second connection waits for the first connection to
complete\. But there is a bug that causes this mechanism to fail\.
- If a producer uses a default name, the second registration will override
the first one. But it can not override the first one if it uses a specified
producer name\. I think this mechanism is to prevent a client from creating two
producers with the same name. However, method `Producer.isSuccessorTo` has
checked the `producer-id`, and the `producer-id` of multiple producers created
by the same client are different. So this mechanism can be deleted.
### Modifications
- For `issue 1`: If a producer with the same name tries to use a new
connection, async checks the old connection is available. The producers related
to the connection that is not available are automatically cleaned up.
- For `issue 2`:
- Fix the bug that causes a complete producer future will be removed
from `ServerCnx`.
- Remove the mechanism that prevents a producer with a specified name
from overriding the previous producer.
---
.../pulsar/broker/service/AbstractTopic.java | 8 +-
.../apache/pulsar/broker/service/ServerCnx.java | 40 ++---
.../broker/auth/MockedPulsarServiceBaseTest.java | 24 +++
.../pulsar/broker/service/ServerCnxTest.java | 162 ++++++++++++++++++++-
.../client/impl/ProducerConsumerInternalTest.java | 57 ++++++++
5 files changed, 268 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cef2dd2080c..31e37d0f176 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -982,8 +982,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
private void tryOverwriteOldProducer(Producer oldProducer, Producer
newProducer)
throws BrokerServiceException {
- if (newProducer.isSuccessorTo(oldProducer) &&
!isUserProvidedProducerName(oldProducer)
- && !isUserProvidedProducerName(newProducer)) {
+ if (newProducer.isSuccessorTo(oldProducer)) {
oldProducer.close(false);
if (!producers.replace(newProducer.getProducerName(), oldProducer,
newProducer)) {
// Met concurrent update, throw exception here so that client
can try reconnect later.
@@ -993,6 +992,11 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
handleProducerRemoved(oldProducer);
}
} else {
+ // If a producer with the same name tries to use a new connection,
async check the old connection is
+ // available. The producers related the connection that not
available are automatically cleaned up.
+ if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
+ oldProducer.getCnx().checkConnectionLiveness();
+ }
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() +
"' is already connected to topic");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cae16f3c761..5809e1297fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1398,36 +1398,36 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Producer> existingProducerFuture =
producers.putIfAbsent(producerId, producerFuture);
if (existingProducerFuture != null) {
- if (existingProducerFuture.isDone() &&
!existingProducerFuture.isCompletedExceptionally()) {
- Producer producer = existingProducerFuture.getNow(null);
- log.info("[{}] Producer with the same id is already
created:"
- + " producerId={}, producer={}", remoteAddress,
producerId, producer);
- commandSender.sendProducerSuccessResponse(requestId,
producer.getProducerName(),
- producer.getSchemaVersion());
- return null;
- } else {
+ if (!existingProducerFuture.isDone()) {
// There was an early request to create a producer with
same producerId.
// This can happen when client timeout is lower than the
broker timeouts.
// We need to wait until the previous producer creation
request
// either complete or fails.
- ServerError error = null;
- if (!existingProducerFuture.isDone()) {
- error = ServerError.ServiceNotReady;
- } else {
- error = getErrorCode(existingProducerFuture);
- // remove producer with producerId as it's already
completed with exception
- producers.remove(producerId, existingProducerFuture);
- }
log.warn("[{}][{}] Producer with id is already present on
the connection, producerId={}",
remoteAddress, topicName, producerId);
- commandSender.sendErrorResponse(requestId, error,
"Producer is already present on the connection");
- return null;
+ commandSender.sendErrorResponse(requestId,
ServerError.ServiceNotReady,
+ "Producer is already present on the connection");
+ } else if (existingProducerFuture.isCompletedExceptionally()) {
+ // remove producer with producerId as it's already
completed with exception
+ log.warn("[{}][{}] Producer with id is failed to register
present on the connection, producerId={}",
+ remoteAddress, topicName, producerId);
+ ServerError error = getErrorCode(existingProducerFuture);
+ producers.remove(producerId, existingProducerFuture);
+ commandSender.sendErrorResponse(requestId, error,
+ "Producer is already failed to register present on
the connection");
+ } else {
+ Producer producer = existingProducerFuture.getNow(null);
+ log.info("[{}] [{}] Producer with the same id is already
created:"
+ + " producerId={}, producer={}", remoteAddress,
topicName, producerId, producer);
+ commandSender.sendProducerSuccessResponse(requestId,
producer.getProducerName(),
+ producer.getSchemaVersion());
}
+ return null;
}
if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Creating producer. producerId={}, schema
is {}", remoteAddress, topicName,
- producerId, schema == null ? "absent" : "present");
+ log.debug("[{}][{}] Creating producer. producerId={},
producerName={}, schema is {}", remoteAddress,
+ topicName, producerId, producerName, schema == null ?
"absent" : "present");
}
service.getOrCreateTopic(topicName.toString()).thenCompose((Topic
topic) -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c32d3fc3b0b..c34d98c7b9a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.auth;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
@@ -37,10 +38,13 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.TimeoutHandler;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -48,6 +52,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -56,6 +61,7 @@ import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.utils.ResourceUtils;
import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;
import org.slf4j.Logger;
@@ -644,5 +650,23 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
};
}
+ protected ServiceProducer getServiceProducer(ProducerImpl clientProducer,
String topicName) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ org.apache.pulsar.broker.service.Producer serviceProducer =
+
persistentTopic.getProducers().get(clientProducer.getProducerName());
+ long clientProducerId = WhiteboxImpl.getInternalState(clientProducer,
"producerId");
+ assertEquals(serviceProducer.getProducerId(), clientProducerId);
+ assertEquals(serviceProducer.getEpoch(),
clientProducer.getConnectionHandler().getEpoch());
+ return new ServiceProducer(serviceProducer, persistentTopic);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public static class ServiceProducer {
+ private org.apache.pulsar.broker.service.Producer serviceProducer;
+ private PersistentTopic persistentTopic;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c3bab634a42..2ea5e28880b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -47,6 +47,8 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.vertx.core.impl.ConcurrentHashSet;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -64,6 +66,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
@@ -93,6 +97,7 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
@@ -113,6 +118,7 @@ import
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -135,6 +141,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.protocol.schema.EmptyVersion;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -149,6 +156,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+@Slf4j
@SuppressWarnings("unchecked")
@Test(groups = "broker")
public class ServerCnxTest {
@@ -184,10 +192,12 @@ public class ServerCnxTest {
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
+ private ConcurrentHashSet<EmbeddedChannel>
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
+ channelsStoppedAnswerHealthCheck.clear();
svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -927,6 +937,134 @@ public class ServerCnxTest {
}));
}
+ @Test
+ public void testHandleProducerAfterClientChannelInactive() throws
Exception {
+ final String tName = successTopicName;
+ final long producerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final MutableInt epoch = new MutableInt(1);
+ final Map<String, String> metadata = Collections.emptyMap();
+ final String pName = "p1";
+ resetChannel();
+ setChannelConnected();
+
+ // The producer register using the first connection.
+ ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ channel.writeInbound(cmdProducer1);
+ assertTrue(getResponse() instanceof CommandProducerSuccess);
+ PersistentTopic topicRef = (PersistentTopic)
brokerService.getTopicReference(tName).get();
+ assertNotNull(topicRef);
+ assertEquals(topicRef.getProducers().size(), 1);
+
+ // Verify the second producer using a new connection will override the
producer who using a stopped channel.
+ channelsStoppedAnswerHealthCheck.add(channel);
+ ClientChannel channel2 = new ClientChannel();
+ setChannelConnected(channel2.serverCnx);
+ Awaitility.await().untilAsserted(() -> {
+ ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(),
false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ channel2.channel.writeInbound(cmdProducer2);
+ assertTrue(getResponse(channel2.channel,
channel2.clientChannelHelper) instanceof CommandProducerSuccess);
+ assertEquals(topicRef.getProducers().size(), 1);
+ });
+
+ // cleanup.
+ channel.finish();
+ channel2.close();
+ }
+
+ private class ClientChannel implements Closeable {
+ private ClientChannelHelper clientChannelHelper = new
ClientChannelHelper();
+ private ServerCnx serverCnx = new ServerCnx(pulsar);
+ private EmbeddedChannel channel = new EmbeddedChannel(new
LengthFieldBasedFrameDecoder(
+ 5 * 1024 * 1024,
+ 0,
+ 4,
+ 0,
+ 4),
+ serverCnx);
+ public ClientChannel() {
+ serverCnx.setAuthRole("");
+ }
+ public void close(){
+ if (channel != null && channel.isActive()) {
+ serverCnx.close();
+ channel.close();
+ }
+ }
+ }
+
+ @Test
+ public void testHandleProducer() throws Exception {
+ final String tName = "persistent://public/default/test-topic";
+ final long producerId = 1;
+ final MutableInt requestId = new MutableInt(1);
+ final MutableInt epoch = new MutableInt(1);
+ final Map<String, String> metadata = Collections.emptyMap();
+ final String pName = "p1";
+ resetChannel();
+ assertTrue(channel.isActive());
+ assertEquals(serverCnx.getState(), State.Start);
+
+ // connect.
+ ByteBuf cConnect = Commands.newConnect("none", "", null);
+ channel.writeInbound(cConnect);
+ assertEquals(serverCnx.getState(), State.Connected);
+ assertTrue(getResponse() instanceof CommandConnected);
+
+ // There is an in-progress producer registration.
+ ByteBuf cProducer1 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ CompletableFuture existingFuture1 = new CompletableFuture();
+ serverCnx.getProducers().put(producerId, existingFuture1);
+ channel.writeInbound(cProducer1);
+ Object response1 = getResponse();
+ assertTrue(response1 instanceof CommandError);
+ CommandError error1 = (CommandError) response1;
+ assertEquals(error1.getError().toString(),
ServerError.ServiceNotReady.toString());
+ assertTrue(error1.getMessage().contains("already present on the
connection"));
+
+ // There is a failed registration.
+ ByteBuf cProducer2 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ CompletableFuture existingFuture2 = new CompletableFuture();
+ existingFuture2.completeExceptionally(new
BrokerServiceException.ProducerBusyException("123"));
+ serverCnx.getProducers().put(producerId, existingFuture2);
+
+ channel.writeInbound(cProducer2);
+ Object response2 = getResponse();
+ assertTrue(response2 instanceof CommandError);
+ CommandError error2 = (CommandError) response2;
+ assertEquals(error2.getError().toString(),
ServerError.ProducerBusy.toString());
+ assertTrue(error2.getMessage().contains("already failed to register
present on the connection"));
+
+ // There is an successful registration.
+ ByteBuf cProducer3 = Commands.newProducer(tName, producerId,
requestId.incrementAndGet(),
+ pName, false, metadata, null, epoch.incrementAndGet(), false,
+ ProducerAccessMode.Shared, Optional.empty(), false);
+ CompletableFuture existingFuture3 = new CompletableFuture();
+ org.apache.pulsar.broker.service.Producer serviceProducer =
+ mock(org.apache.pulsar.broker.service.Producer.class);
+ when(serviceProducer.getProducerName()).thenReturn(pName);
+ when(serviceProducer.getSchemaVersion()).thenReturn(new
EmptyVersion());
+ existingFuture3.complete(serviceProducer);
+ serverCnx.getProducers().put(producerId, existingFuture3);
+
+ channel.writeInbound(cProducer3);
+ Object response3 = getResponse();
+ assertTrue(response3 instanceof CommandProducerSuccess);
+ CommandProducerSuccess cProducerSuccess = (CommandProducerSuccess)
response3;
+ assertEquals(cProducerSuccess.getProducerName(), pName);
+
+ // cleanup.
+ channel.finish();
+ }
+
// This test used to be in the ServerCnxAuthorizationTest class, but it
was migrated here because the mocking
// in that class was too extensive. There is some overlap with this test
and other tests in this class. The primary
// role of this test is verifying that the correct role and
AuthenticationDataSource are passed to the
@@ -2471,6 +2609,10 @@ public class ServerCnxTest {
}
protected void setChannelConnected() throws Exception {
+ setChannelConnected(serverCnx);
+ }
+
+ protected void setChannelConnected(ServerCnx serverCnx) throws Exception {
Field channelState = ServerCnx.class.getDeclaredField("state");
channelState.setAccessible(true);
channelState.set(serverCnx, State.Connected);
@@ -2484,13 +2626,31 @@ public class ServerCnxTest {
}
protected Object getResponse() throws Exception {
+ return getResponse(channel, clientChannelHelper);
+ }
+
+ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper
clientChannelHelper) throws Exception {
// Wait at most for 10s to get a response
final long sleepTimeMs = 10;
final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
for (int i = 0; i < iterations; i++) {
if (!channel.outboundMessages().isEmpty()) {
Object outObject = channel.outboundMessages().remove();
- return clientChannelHelper.getCommand(outObject);
+ Object cmd = clientChannelHelper.getCommand(outObject);
+ if (cmd instanceof CommandPing) {
+ if (channelsStoppedAnswerHealthCheck.contains(channel)) {
+ continue;
+ }
+
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
+ if (!future.isSuccess()) {
+ log.warn("[{}] Forcing connection to close since
cannot send a pong message.",
+ channel, future.cause());
+ channel.close();
+ }
+ });
+ continue;
+ }
+ return cmd;
} else {
Thread.sleep(sleepTimeMs);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
index 5f242d44954..f05f7356357 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -19,11 +19,13 @@
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import java.util.concurrent.CountDownLatch;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -49,6 +51,61 @@ public class ProducerConsumerInternalTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Test
+ public void testSameProducerRegisterTwice() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // Create producer using default producerName.
+ ProducerImpl producer = (ProducerImpl)
pulsarClient.newProducer().topic(topicName).create();
+ ServiceProducer serviceProducer = getServiceProducer(producer,
topicName);
+
+ // Remove producer maintained by server cnx. To make it can register
the second time.
+ removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+ // Trigger the client producer reconnect.
+ CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+ commandCloseProducer.setProducerId(producer.producerId);
+ producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+ // Verify the reconnection will be success.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getState().toString(), "Ready");
+ });
+ }
+
+ @Test
+ public void testSameProducerRegisterTwiceWithSpecifiedProducerName()
throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String pName = "p1";
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // Create producer using default producerName.
+ ProducerImpl producer = (ProducerImpl)
pulsarClient.newProducer().producerName(pName).topic(topicName).create();
+ ServiceProducer serviceProducer = getServiceProducer(producer,
topicName);
+
+ // Remove producer maintained by server cnx. To make it can register
the second time.
+ removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+ // Trigger the client producer reconnect.
+ CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+ commandCloseProducer.setProducerId(producer.producerId);
+ producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+ // Verify the reconnection will be success.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(producer.getState().toString(), "Ready", "The
producer registration failed");
+ });
+ }
+
+ private void removeServiceProducerMaintainedByServerCnx(ServiceProducer
serviceProducer) {
+ ServerCnx serverCnx = (ServerCnx)
serviceProducer.getServiceProducer().getCnx();
+ serverCnx.removedProducer(serviceProducer.getServiceProducer());
+ Awaitility.await().untilAsserted(() -> {
+
assertFalse(serverCnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
+ });
+ }
+
@Test
public void
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws
Exception {
final String topicName =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");