This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ad5bb050cb1 [fix] [broker] Make specified producer could override the 
previous one (#21155)
ad5bb050cb1 is described below

commit ad5bb050cb131eee3f03baf396d216d2d56cabf9
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 14 00:17:10 2023 +0800

    [fix] [broker] Make specified producer could override the previous one 
(#21155)
    
    The client assumed the connection was inactive, but the Broker assumed the 
connection was fine. The Client tried to  use a new connection to reconnect a 
producer, then got an error `Producer with name 'st-0-5' is already connected 
to topic`.
    
    - In a connection, the second connection waits for the first connection to 
complete\. But there is a bug that causes this mechanism to fail\.
    - If a producer uses a default name, the second registration will override 
the first one. But it can not override the first one if it uses a specified 
producer name\. I think this mechanism is to prevent a client from creating two 
producers with the same name. However, method `Producer.isSuccessorTo` has 
checked the `producer-id`, and the `producer-id` of multiple producers created 
by the same client are different. So this mechanism can be deleted.
    
    - For `issue 1`: If a producer with the same name tries to use a new 
connection, async checks the old connection is available. The producers related 
to the connection that is not available are automatically cleaned up.
    
    - For `issue 2`:
      -  Fix the bug that causes a complete producer future will be removed 
from `ServerCnx`.
      - Remove the mechanism that prevents a producer with a specified name 
from overriding the previous producer.
    
    (cherry picked from commit bda16b6f5b715942f7ed996052f6cbd8026fbbf0)
---
 .../pulsar/broker/service/AbstractTopic.java       |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  40 ++---
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  28 +++-
 .../pulsar/broker/service/ServerCnxTest.java       | 162 ++++++++++++++++++++-
 .../client/impl/ProducerConsumerInternalTest.java  | 147 +++++++++++++++++++
 5 files changed, 360 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 97966828fb1..03c6233a925 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -982,8 +982,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
             throws BrokerServiceException {
-        if (newProducer.isSuccessorTo(oldProducer) && 
!isUserProvidedProducerName(oldProducer)
-                && !isUserProvidedProducerName(newProducer)) {
+        if (newProducer.isSuccessorTo(oldProducer)) {
             oldProducer.close(false);
             if (!producers.replace(newProducer.getProducerName(), oldProducer, 
newProducer)) {
                 // Met concurrent update, throw exception here so that client 
can try reconnect later.
@@ -993,6 +992,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 handleProducerRemoved(oldProducer);
             }
         } else {
+            // If a producer with the same name tries to use a new connection, 
async check the old connection is
+            // available. The producers related the connection that not 
available are automatically cleaned up.
+            if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
+                oldProducer.getCnx().checkConnectionLiveness();
+            }
             throw new BrokerServiceException.NamingException(
                     "Producer with name '" + newProducer.getProducerName() + 
"' is already connected to topic");
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b566ed3a05f..3fc1e96a57d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1389,36 +1389,36 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             CompletableFuture<Producer> existingProducerFuture = 
producers.putIfAbsent(producerId, producerFuture);
 
             if (existingProducerFuture != null) {
-                if (existingProducerFuture.isDone() && 
!existingProducerFuture.isCompletedExceptionally()) {
-                    Producer producer = existingProducerFuture.getNow(null);
-                    log.info("[{}] Producer with the same id is already 
created:"
-                            + " producerId={}, producer={}", remoteAddress, 
producerId, producer);
-                    commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
-                            producer.getSchemaVersion());
-                    return null;
-                } else {
+                if (!existingProducerFuture.isDone()) {
                     // There was an early request to create a producer with 
same producerId.
                     // This can happen when client timeout is lower than the 
broker timeouts.
                     // We need to wait until the previous producer creation 
request
                     // either complete or fails.
-                    ServerError error = null;
-                    if (!existingProducerFuture.isDone()) {
-                        error = ServerError.ServiceNotReady;
-                    } else {
-                        error = getErrorCode(existingProducerFuture);
-                        // remove producer with producerId as it's already 
completed with exception
-                        producers.remove(producerId, existingProducerFuture);
-                    }
                     log.warn("[{}][{}] Producer with id is already present on 
the connection, producerId={}",
                             remoteAddress, topicName, producerId);
-                    commandSender.sendErrorResponse(requestId, error, 
"Producer is already present on the connection");
-                    return null;
+                    commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
+                            "Producer is already present on the connection");
+                } else if (existingProducerFuture.isCompletedExceptionally()) {
+                    // remove producer with producerId as it's already 
completed with exception
+                    log.warn("[{}][{}] Producer with id is failed to register 
present on the connection, producerId={}",
+                            remoteAddress, topicName, producerId);
+                    ServerError error = getErrorCode(existingProducerFuture);
+                    producers.remove(producerId, existingProducerFuture);
+                    commandSender.sendErrorResponse(requestId, error,
+                            "Producer is already failed to register present on 
the connection");
+                } else {
+                    Producer producer = existingProducerFuture.getNow(null);
+                    log.info("[{}] [{}] Producer with the same id is already 
created:"
+                            + " producerId={}, producer={}", remoteAddress, 
topicName, producerId, producer);
+                    commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
+                            producer.getSchemaVersion());
                 }
+                return null;
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Creating producer. producerId={}, schema 
is {}", remoteAddress, topicName,
-                        producerId, schema == null ? "absent" : "present");
+                log.debug("[{}][{}] Creating producer. producerId={}, 
producerName={}, schema is {}", remoteAddress,
+                        topicName, producerId, producerName, schema == null ? 
"absent" : "present");
             }
 
             service.getOrCreateTopic(topicName.toString()).thenCompose((Topic 
topic) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 3fe8c22b1c4..b16dc27e265 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pulsar.broker.auth;
 
-import static org.apache.pulsar.broker.BrokerTestUtil.*;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
@@ -37,10 +38,13 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.TimeoutHandler;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -48,6 +52,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -55,11 +60,12 @@ import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.utils.ResourceUtils;
 import org.apache.zookeeper.MockZooKeeper;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.mockito.internal.util.MockUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testng.annotations.DataProvider;
 
 /**
@@ -623,5 +629,23 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         };
     }
 
+    protected ServiceProducer getServiceProducer(ProducerImpl clientProducer, 
String topicName) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        org.apache.pulsar.broker.service.Producer serviceProducer =
+                
persistentTopic.getProducers().get(clientProducer.getProducerName());
+        long clientProducerId = WhiteboxImpl.getInternalState(clientProducer, 
"producerId");
+        assertEquals(serviceProducer.getProducerId(), clientProducerId);
+        assertEquals(serviceProducer.getEpoch(), 
clientProducer.getConnectionHandler().getEpoch());
+        return new ServiceProducer(serviceProducer, persistentTopic);
+    }
+
+    @Data
+    @AllArgsConstructor
+    public static class ServiceProducer {
+        private org.apache.pulsar.broker.service.Producer serviceProducer;
+        private PersistentTopic persistentTopic;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c3bab634a42..2ea5e28880b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -47,6 +47,8 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.vertx.core.impl.ConcurrentHashSet;
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -64,6 +66,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
@@ -93,6 +97,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
@@ -113,6 +118,7 @@ import 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
 import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -135,6 +141,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.protocol.schema.EmptyVersion;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -149,6 +156,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @SuppressWarnings("unchecked")
 @Test(groups = "broker")
 public class ServerCnxTest {
@@ -184,10 +192,12 @@ public class ServerCnxTest {
 
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
+    private ConcurrentHashSet<EmbeddedChannel> 
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
 
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
+        channelsStoppedAnswerHealthCheck.clear();
         svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -927,6 +937,134 @@ public class ServerCnxTest {
                 }));
     }
 
+    @Test
+    public void testHandleProducerAfterClientChannelInactive() throws 
Exception {
+        final String tName = successTopicName;
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        setChannelConnected();
+
+        // The producer register using the first connection.
+        ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        channel.writeInbound(cmdProducer1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+        PersistentTopic topicRef = (PersistentTopic) 
brokerService.getTopicReference(tName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        // Verify the second producer using a new connection will override the 
producer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        setChannelConnected(channel2.serverCnx);
+        Awaitility.await().untilAsserted(() -> {
+            ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                    pName, false, metadata, null, epoch.incrementAndGet(), 
false,
+                    ProducerAccessMode.Shared, Optional.empty(), false);
+            channel2.channel.writeInbound(cmdProducer2);
+            assertTrue(getResponse(channel2.channel, 
channel2.clientChannelHelper) instanceof CommandProducerSuccess);
+            assertEquals(topicRef.getProducers().size(), 1);
+        });
+
+        // cleanup.
+        channel.finish();
+        channel2.close();
+    }
+
+    private class ClientChannel implements Closeable {
+        private ClientChannelHelper clientChannelHelper = new 
ClientChannelHelper();
+        private ServerCnx serverCnx = new ServerCnx(pulsar);
+        private EmbeddedChannel channel = new EmbeddedChannel(new 
LengthFieldBasedFrameDecoder(
+                5 * 1024 * 1024,
+                0,
+                4,
+                0,
+                4),
+                serverCnx);
+        public ClientChannel() {
+            serverCnx.setAuthRole("");
+        }
+        public void close(){
+            if (channel != null && channel.isActive()) {
+                serverCnx.close();
+                channel.close();
+            }
+        }
+    }
+
+    @Test
+    public void testHandleProducer() throws Exception {
+        final String tName = "persistent://public/default/test-topic";
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // connect.
+        ByteBuf cConnect = Commands.newConnect("none", "", null);
+        channel.writeInbound(cConnect);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertTrue(getResponse() instanceof CommandConnected);
+
+        // There is an in-progress producer registration.
+        ByteBuf cProducer1 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture1 = new CompletableFuture();
+        serverCnx.getProducers().put(producerId, existingFuture1);
+        channel.writeInbound(cProducer1);
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandError);
+        CommandError error1 = (CommandError) response1;
+        assertEquals(error1.getError().toString(), 
ServerError.ServiceNotReady.toString());
+        assertTrue(error1.getMessage().contains("already present on the 
connection"));
+
+        // There is a failed registration.
+        ByteBuf cProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture2 = new CompletableFuture();
+        existingFuture2.completeExceptionally(new 
BrokerServiceException.ProducerBusyException("123"));
+        serverCnx.getProducers().put(producerId, existingFuture2);
+
+        channel.writeInbound(cProducer2);
+        Object response2 = getResponse();
+        assertTrue(response2 instanceof CommandError);
+        CommandError error2 = (CommandError) response2;
+        assertEquals(error2.getError().toString(), 
ServerError.ProducerBusy.toString());
+        assertTrue(error2.getMessage().contains("already failed to register 
present on the connection"));
+
+        // There is an successful registration.
+        ByteBuf cProducer3 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture3 = new CompletableFuture();
+        org.apache.pulsar.broker.service.Producer serviceProducer =
+                mock(org.apache.pulsar.broker.service.Producer.class);
+        when(serviceProducer.getProducerName()).thenReturn(pName);
+        when(serviceProducer.getSchemaVersion()).thenReturn(new 
EmptyVersion());
+        existingFuture3.complete(serviceProducer);
+        serverCnx.getProducers().put(producerId, existingFuture3);
+
+        channel.writeInbound(cProducer3);
+        Object response3 = getResponse();
+        assertTrue(response3 instanceof CommandProducerSuccess);
+        CommandProducerSuccess cProducerSuccess = (CommandProducerSuccess) 
response3;
+        assertEquals(cProducerSuccess.getProducerName(), pName);
+
+        // cleanup.
+        channel.finish();
+    }
+
     // This test used to be in the ServerCnxAuthorizationTest class, but it 
was migrated here because the mocking
     // in that class was too extensive. There is some overlap with this test 
and other tests in this class. The primary
     // role of this test is verifying that the correct role and 
AuthenticationDataSource are passed to the
@@ -2471,6 +2609,10 @@ public class ServerCnxTest {
     }
 
     protected void setChannelConnected() throws Exception {
+        setChannelConnected(serverCnx);
+    }
+
+    protected void setChannelConnected(ServerCnx serverCnx) throws Exception {
         Field channelState = ServerCnx.class.getDeclaredField("state");
         channelState.setAccessible(true);
         channelState.set(serverCnx, State.Connected);
@@ -2484,13 +2626,31 @@ public class ServerCnxTest {
     }
 
     protected Object getResponse() throws Exception {
+        return getResponse(channel, clientChannelHelper);
+    }
+
+    protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper 
clientChannelHelper) throws Exception {
         // Wait at most for 10s to get a response
         final long sleepTimeMs = 10;
         final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
         for (int i = 0; i < iterations; i++) {
             if (!channel.outboundMessages().isEmpty()) {
                 Object outObject = channel.outboundMessages().remove();
-                return clientChannelHelper.getCommand(outObject);
+                Object cmd = clientChannelHelper.getCommand(outObject);
+                if (cmd instanceof CommandPing) {
+                    if (channelsStoppedAnswerHealthCheck.contains(channel)) {
+                        continue;
+                    }
+                    
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
+                        if (!future.isSuccess()) {
+                            log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
+                                    channel, future.cause());
+                            channel.close();
+                        }
+                    });
+                    continue;
+                }
+                return cmd;
             } else {
                 Thread.sleep(sleepTimeMs);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
new file mode 100644
index 00000000000..f05f7356357
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.concurrent.CountDownLatch;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Different with {@link 
org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit 
the variables
+ * of {@link ConsumerImpl} which are modified `protected`.
+ */
+@Test(groups = "broker-api")
+public class ProducerConsumerInternalTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSameProducerRegisterTwice() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create producer using default producerName.
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer().topic(topicName).create();
+        ServiceProducer serviceProducer = getServiceProducer(producer, 
topicName);
+
+        // Remove producer maintained by server cnx. To make it can register 
the second time.
+        removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+        // Trigger the client producer reconnect.
+        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+        commandCloseProducer.setProducerId(producer.producerId);
+        producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+        // Verify the reconnection will be success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getState().toString(), "Ready");
+        });
+    }
+
+    @Test
+    public void testSameProducerRegisterTwiceWithSpecifiedProducerName() 
throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String pName = "p1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create producer using default producerName.
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer().producerName(pName).topic(topicName).create();
+        ServiceProducer serviceProducer = getServiceProducer(producer, 
topicName);
+
+        // Remove producer maintained by server cnx. To make it can register 
the second time.
+        removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+        // Trigger the client producer reconnect.
+        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+        commandCloseProducer.setProducerId(producer.producerId);
+        producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+        // Verify the reconnection will be success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getState().toString(), "Ready", "The 
producer registration failed");
+        });
+    }
+
+    private void removeServiceProducerMaintainedByServerCnx(ServiceProducer 
serviceProducer) {
+        ServerCnx serverCnx = (ServerCnx) 
serviceProducer.getServiceProducer().getCnx();
+        serverCnx.removedProducer(serviceProducer.getServiceProducer());
+        Awaitility.await().untilAsserted(() -> {
+            
assertFalse(serverCnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
+        });
+    }
+
+    @Test
+    public void 
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "subscription1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        final ConsumerImpl consumer = (ConsumerImpl) 
pulsarClient.newConsumer().topic(topicName.toString())
+                
.subscriptionType(SubscriptionType.Exclusive).subscriptionName(subscriptionName).subscribe();
+
+        ClientCnx clientCnx = consumer.getClientCnx();
+        ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService()
+                
.getTopic(topicName,false).join().get().getSubscription(subscriptionName)
+                .getDispatcher().getConsumers().get(0).cnx();
+
+        // Make a disconnect to trigger broker remove the consumer which 
related this connection.
+        // Make the second subscribe runs after the broker removing the old 
consumer, then it will receive
+        // an error: "Exclusive consumer is already connected"
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        serverCnx.execute(() -> {
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        clientCnx.close();
+        Thread.sleep(1000);
+        countDownLatch.countDown();
+
+        // Verify the consumer will always retry subscribe event received 
ConsumerBusy error.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.getState(), HandlerState.State.Ready);
+        });
+
+        // cleanup.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+}

Reply via email to