This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bda16b6f5b7 [fix] [broker] Make specified producer could override the 
previous one (#21155)
bda16b6f5b7 is described below

commit bda16b6f5b715942f7ed996052f6cbd8026fbbf0
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 14 00:17:10 2023 +0800

    [fix] [broker] Make specified producer could override the previous one 
(#21155)
    
    #### Issue 1
    The client assumed the connection was inactive, but the Broker assumed the 
connection was fine. The Client tried to  use a new connection to reconnect a 
producer, then got an error `Producer with name 'st-0-5' is already connected 
to topic`.
    
    #### Issue 2
    - In a connection, the second connection waits for the first connection to 
complete\. But there is a bug that causes this mechanism to fail\.
    - If a producer uses a default name, the second registration will override 
the first one. But it can not override the first one if it uses a specified 
producer name\. I think this mechanism is to prevent a client from creating two 
producers with the same name. However, method `Producer.isSuccessorTo` has 
checked the `producer-id`, and the `producer-id` of multiple producers created 
by the same client are different. So this mechanism can be deleted.
    
    ### Modifications
    
    - For `issue 1`: If a producer with the same name tries to use a new 
connection, async checks the old connection is available. The producers related 
to the connection that is not available are automatically cleaned up.
    
    - For `issue 2`:
      -  Fix the bug that causes a complete producer future will be removed 
from `ServerCnx`.
      - Remove the mechanism that prevents a producer with a specified name 
from overriding the previous producer.
---
 .../pulsar/broker/service/AbstractTopic.java       |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  40 ++---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  24 +++
 .../pulsar/broker/service/ServerCnxTest.java       | 162 ++++++++++++++++++++-
 .../client/impl/ProducerConsumerInternalTest.java  |  57 ++++++++
 5 files changed, 268 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index cef2dd2080c..31e37d0f176 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -982,8 +982,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
             throws BrokerServiceException {
-        if (newProducer.isSuccessorTo(oldProducer) && 
!isUserProvidedProducerName(oldProducer)
-                && !isUserProvidedProducerName(newProducer)) {
+        if (newProducer.isSuccessorTo(oldProducer)) {
             oldProducer.close(false);
             if (!producers.replace(newProducer.getProducerName(), oldProducer, 
newProducer)) {
                 // Met concurrent update, throw exception here so that client 
can try reconnect later.
@@ -993,6 +992,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 handleProducerRemoved(oldProducer);
             }
         } else {
+            // If a producer with the same name tries to use a new connection, 
async check the old connection is
+            // available. The producers related the connection that not 
available are automatically cleaned up.
+            if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
+                oldProducer.getCnx().checkConnectionLiveness();
+            }
             throw new BrokerServiceException.NamingException(
                     "Producer with name '" + newProducer.getProducerName() + 
"' is already connected to topic");
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cae16f3c761..5809e1297fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1398,36 +1398,36 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             CompletableFuture<Producer> existingProducerFuture = 
producers.putIfAbsent(producerId, producerFuture);
 
             if (existingProducerFuture != null) {
-                if (existingProducerFuture.isDone() && 
!existingProducerFuture.isCompletedExceptionally()) {
-                    Producer producer = existingProducerFuture.getNow(null);
-                    log.info("[{}] Producer with the same id is already 
created:"
-                            + " producerId={}, producer={}", remoteAddress, 
producerId, producer);
-                    commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
-                            producer.getSchemaVersion());
-                    return null;
-                } else {
+                if (!existingProducerFuture.isDone()) {
                     // There was an early request to create a producer with 
same producerId.
                     // This can happen when client timeout is lower than the 
broker timeouts.
                     // We need to wait until the previous producer creation 
request
                     // either complete or fails.
-                    ServerError error = null;
-                    if (!existingProducerFuture.isDone()) {
-                        error = ServerError.ServiceNotReady;
-                    } else {
-                        error = getErrorCode(existingProducerFuture);
-                        // remove producer with producerId as it's already 
completed with exception
-                        producers.remove(producerId, existingProducerFuture);
-                    }
                     log.warn("[{}][{}] Producer with id is already present on 
the connection, producerId={}",
                             remoteAddress, topicName, producerId);
-                    commandSender.sendErrorResponse(requestId, error, 
"Producer is already present on the connection");
-                    return null;
+                    commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
+                            "Producer is already present on the connection");
+                } else if (existingProducerFuture.isCompletedExceptionally()) {
+                    // remove producer with producerId as it's already 
completed with exception
+                    log.warn("[{}][{}] Producer with id is failed to register 
present on the connection, producerId={}",
+                            remoteAddress, topicName, producerId);
+                    ServerError error = getErrorCode(existingProducerFuture);
+                    producers.remove(producerId, existingProducerFuture);
+                    commandSender.sendErrorResponse(requestId, error,
+                            "Producer is already failed to register present on 
the connection");
+                } else {
+                    Producer producer = existingProducerFuture.getNow(null);
+                    log.info("[{}] [{}] Producer with the same id is already 
created:"
+                            + " producerId={}, producer={}", remoteAddress, 
topicName, producerId, producer);
+                    commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
+                            producer.getSchemaVersion());
                 }
+                return null;
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Creating producer. producerId={}, schema 
is {}", remoteAddress, topicName,
-                        producerId, schema == null ? "absent" : "present");
+                log.debug("[{}][{}] Creating producer. producerId={}, 
producerName={}, schema is {}", remoteAddress,
+                        topicName, producerId, producerName, schema == null ? 
"absent" : "present");
             }
 
             service.getOrCreateTopic(topicName.toString()).thenCompose((Topic 
topic) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c32d3fc3b0b..c34d98c7b9a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.auth;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
@@ -37,10 +38,13 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.TimeoutHandler;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -48,6 +52,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -56,6 +61,7 @@ import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.utils.ResourceUtils;
 import org.apache.zookeeper.MockZooKeeper;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.mockito.internal.util.MockUtil;
 import org.slf4j.Logger;
@@ -644,5 +650,23 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         };
     }
 
+    protected ServiceProducer getServiceProducer(ProducerImpl clientProducer, 
String topicName) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        org.apache.pulsar.broker.service.Producer serviceProducer =
+                
persistentTopic.getProducers().get(clientProducer.getProducerName());
+        long clientProducerId = WhiteboxImpl.getInternalState(clientProducer, 
"producerId");
+        assertEquals(serviceProducer.getProducerId(), clientProducerId);
+        assertEquals(serviceProducer.getEpoch(), 
clientProducer.getConnectionHandler().getEpoch());
+        return new ServiceProducer(serviceProducer, persistentTopic);
+    }
+
+    @Data
+    @AllArgsConstructor
+    public static class ServiceProducer {
+        private org.apache.pulsar.broker.service.Producer serviceProducer;
+        private PersistentTopic persistentTopic;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c3bab634a42..2ea5e28880b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -47,6 +47,8 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.vertx.core.impl.ConcurrentHashSet;
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -64,6 +66,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
@@ -93,6 +97,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
@@ -113,6 +118,7 @@ import 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -135,6 +141,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.protocol.schema.EmptyVersion;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -149,6 +156,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @SuppressWarnings("unchecked")
 @Test(groups = "broker")
 public class ServerCnxTest {
@@ -184,10 +192,12 @@ public class ServerCnxTest {
 
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
+    private ConcurrentHashSet<EmbeddedChannel> 
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
 
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
+        channelsStoppedAnswerHealthCheck.clear();
         svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -927,6 +937,134 @@ public class ServerCnxTest {
                 }));
     }
 
+    @Test
+    public void testHandleProducerAfterClientChannelInactive() throws 
Exception {
+        final String tName = successTopicName;
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        setChannelConnected();
+
+        // The producer register using the first connection.
+        ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        channel.writeInbound(cmdProducer1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+        PersistentTopic topicRef = (PersistentTopic) 
brokerService.getTopicReference(tName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        // Verify the second producer using a new connection will override the 
producer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        setChannelConnected(channel2.serverCnx);
+        Awaitility.await().untilAsserted(() -> {
+            ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                    pName, false, metadata, null, epoch.incrementAndGet(), 
false,
+                    ProducerAccessMode.Shared, Optional.empty(), false);
+            channel2.channel.writeInbound(cmdProducer2);
+            assertTrue(getResponse(channel2.channel, 
channel2.clientChannelHelper) instanceof CommandProducerSuccess);
+            assertEquals(topicRef.getProducers().size(), 1);
+        });
+
+        // cleanup.
+        channel.finish();
+        channel2.close();
+    }
+
+    private class ClientChannel implements Closeable {
+        private ClientChannelHelper clientChannelHelper = new 
ClientChannelHelper();
+        private ServerCnx serverCnx = new ServerCnx(pulsar);
+        private EmbeddedChannel channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(
+                5 * 1024 * 1024,
+                0,
+                4,
+                0,
+                4),
+                serverCnx);
+        public ClientChannel() {
+            serverCnx.setAuthRole("");
+        }
+        public void close(){
+            if (channel != null && channel.isActive()) {
+                serverCnx.close();
+                channel.close();
+            }
+        }
+    }
+
+    @Test
+    public void testHandleProducer() throws Exception {
+        final String tName = "persistent://public/default/test-topic";
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // connect.
+        ByteBuf cConnect = Commands.newConnect("none", "", null);
+        channel.writeInbound(cConnect);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertTrue(getResponse() instanceof CommandConnected);
+
+        // There is an in-progress producer registration.
+        ByteBuf cProducer1 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture1 = new CompletableFuture();
+        serverCnx.getProducers().put(producerId, existingFuture1);
+        channel.writeInbound(cProducer1);
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandError);
+        CommandError error1 = (CommandError) response1;
+        assertEquals(error1.getError().toString(), 
ServerError.ServiceNotReady.toString());
+        assertTrue(error1.getMessage().contains("already present on the 
connection"));
+
+        // There is a failed registration.
+        ByteBuf cProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture2 = new CompletableFuture();
+        existingFuture2.completeExceptionally(new 
BrokerServiceException.ProducerBusyException("123"));
+        serverCnx.getProducers().put(producerId, existingFuture2);
+
+        channel.writeInbound(cProducer2);
+        Object response2 = getResponse();
+        assertTrue(response2 instanceof CommandError);
+        CommandError error2 = (CommandError) response2;
+        assertEquals(error2.getError().toString(), 
ServerError.ProducerBusy.toString());
+        assertTrue(error2.getMessage().contains("already failed to register 
present on the connection"));
+
+        // There is an successful registration.
+        ByteBuf cProducer3 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture3 = new CompletableFuture();
+        org.apache.pulsar.broker.service.Producer serviceProducer =
+                mock(org.apache.pulsar.broker.service.Producer.class);
+        when(serviceProducer.getProducerName()).thenReturn(pName);
+        when(serviceProducer.getSchemaVersion()).thenReturn(new 
EmptyVersion());
+        existingFuture3.complete(serviceProducer);
+        serverCnx.getProducers().put(producerId, existingFuture3);
+
+        channel.writeInbound(cProducer3);
+        Object response3 = getResponse();
+        assertTrue(response3 instanceof CommandProducerSuccess);
+        CommandProducerSuccess cProducerSuccess = (CommandProducerSuccess) 
response3;
+        assertEquals(cProducerSuccess.getProducerName(), pName);
+
+        // cleanup.
+        channel.finish();
+    }
+
     // This test used to be in the ServerCnxAuthorizationTest class, but it 
was migrated here because the mocking
     // in that class was too extensive. There is some overlap with this test 
and other tests in this class. The primary
     // role of this test is verifying that the correct role and 
AuthenticationDataSource are passed to the
@@ -2471,6 +2609,10 @@ public class ServerCnxTest {
     }
 
     protected void setChannelConnected() throws Exception {
+        setChannelConnected(serverCnx);
+    }
+
+    protected void setChannelConnected(ServerCnx serverCnx) throws Exception {
         Field channelState = ServerCnx.class.getDeclaredField("state");
         channelState.setAccessible(true);
         channelState.set(serverCnx, State.Connected);
@@ -2484,13 +2626,31 @@ public class ServerCnxTest {
     }
 
     protected Object getResponse() throws Exception {
+        return getResponse(channel, clientChannelHelper);
+    }
+
+    protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper 
clientChannelHelper) throws Exception {
         // Wait at most for 10s to get a response
         final long sleepTimeMs = 10;
         final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
         for (int i = 0; i < iterations; i++) {
             if (!channel.outboundMessages().isEmpty()) {
                 Object outObject = channel.outboundMessages().remove();
-                return clientChannelHelper.getCommand(outObject);
+                Object cmd = clientChannelHelper.getCommand(outObject);
+                if (cmd instanceof CommandPing) {
+                    if (channelsStoppedAnswerHealthCheck.contains(channel)) {
+                        continue;
+                    }
+                    
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
+                        if (!future.isSuccess()) {
+                            log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
+                                    channel, future.cause());
+                            channel.close();
+                        }
+                    });
+                    continue;
+                }
+                return cmd;
             } else {
                 Thread.sleep(sleepTimeMs);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
index 5f242d44954..f05f7356357 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -19,11 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import java.util.concurrent.CountDownLatch;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -49,6 +51,61 @@ public class ProducerConsumerInternalTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test
+    public void testSameProducerRegisterTwice() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create producer using default producerName.
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer().topic(topicName).create();
+        ServiceProducer serviceProducer = getServiceProducer(producer, 
topicName);
+
+        // Remove producer maintained by server cnx. To make it can register 
the second time.
+        removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+        // Trigger the client producer reconnect.
+        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+        commandCloseProducer.setProducerId(producer.producerId);
+        producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+        // Verify the reconnection will be success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getState().toString(), "Ready");
+        });
+    }
+
+    @Test
+    public void testSameProducerRegisterTwiceWithSpecifiedProducerName() 
throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String pName = "p1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create producer using default producerName.
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer().producerName(pName).topic(topicName).create();
+        ServiceProducer serviceProducer = getServiceProducer(producer, 
topicName);
+
+        // Remove producer maintained by server cnx. To make it can register 
the second time.
+        removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+        // Trigger the client producer reconnect.
+        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+        commandCloseProducer.setProducerId(producer.producerId);
+        producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+        // Verify the reconnection will be success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getState().toString(), "Ready", "The 
producer registration failed");
+        });
+    }
+
+    private void removeServiceProducerMaintainedByServerCnx(ServiceProducer 
serviceProducer) {
+        ServerCnx serverCnx = (ServerCnx) 
serviceProducer.getServiceProducer().getCnx();
+        serverCnx.removedProducer(serviceProducer.getServiceProducer());
+        Awaitility.await().untilAsserted(() -> {
+            
assertFalse(serverCnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
+        });
+    }
+
     @Test
     public void 
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws 
Exception {
         final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");

Reply via email to