This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new fbf87dfaf91 [fix] [broker] Make specified producer could override the 
previous one (#21155)
fbf87dfaf91 is described below

commit fbf87dfaf91951b394bb93715a3fc3ac105c7905
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 14 00:17:10 2023 +0800

    [fix] [broker] Make specified producer could override the previous one 
(#21155)
    
    The client assumed the connection was inactive, but the Broker assumed the 
connection was fine. The Client tried to  use a new connection to reconnect a 
producer, then got an error `Producer with name 'st-0-5' is already connected 
to topic`.
    
    - In a connection, the second connection waits for the first connection to 
complete\. But there is a bug that causes this mechanism to fail\.
    - If a producer uses a default name, the second registration will override 
the first one. But it can not override the first one if it uses a specified 
producer name\. I think this mechanism is to prevent a client from creating two 
producers with the same name. However, method `Producer.isSuccessorTo` has 
checked the `producer-id`, and the `producer-id` of multiple producers created 
by the same client are different. So this mechanism can be deleted.
    
    - For `issue 1`: If a producer with the same name tries to use a new 
connection, async checks the old connection is available. The producers related 
to the connection that is not available are automatically cleaned up.
    
    - For `issue 2`:
      -  Fix the bug that causes a complete producer future will be removed 
from `ServerCnx`.
      - Remove the mechanism that prevents a producer with a specified name 
from overriding the previous producer.
    
    (cherry picked from commit bda16b6f5b715942f7ed996052f6cbd8026fbbf0)
---
 .../pulsar/broker/service/AbstractTopic.java       |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  84 +++++++---
 .../apache/pulsar/broker/service/TransportCnx.java |   9 ++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  24 +++
 .../pulsar/broker/service/ServerCnxTest.java       | 177 +++++++++++++++++++--
 .../client/impl/ProducerConsumerInternalTest.java  | 147 +++++++++++++++++
 .../pulsar/common/protocol/PulsarHandler.java      |  23 +--
 7 files changed, 433 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 76e1df0d93d..e5950b35817 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -934,8 +934,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
             throws BrokerServiceException {
-        if (newProducer.isSuccessorTo(oldProducer) && 
!isUserProvidedProducerName(oldProducer)
-                && !isUserProvidedProducerName(newProducer)) {
+        if (newProducer.isSuccessorTo(oldProducer)) {
             oldProducer.close(false);
             if (!producers.replace(newProducer.getProducerName(), oldProducer, 
newProducer)) {
                 // Met concurrent update, throw exception here so that client 
can try reconnect later.
@@ -945,6 +944,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 handleProducerRemoved(oldProducer);
             }
         } else {
+            // If a producer with the same name tries to use a new connection, 
async check the old connection is
+            // available. The producers related the connection that not 
available are automatically cleaned up.
+            if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
+                oldProducer.getCnx().checkConnectionLiveness();
+            }
             throw new BrokerServiceException.NamingException(
                     "Producer with name '" + newProducer.getProducerName() + 
"' is already connected to topic");
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ef7cfdf14d8..fe43a0a147b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -154,6 +154,7 @@ import 
org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
 import org.apache.pulsar.functions.utils.Exceptions;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
@@ -216,6 +217,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private final long maxPendingBytesPerThread;
     private final long resumeThresholdPendingBytesPerThread;
 
+    private final long connectionLivenessCheckTimeoutMillis = 5000;
+
     // Number of bytes pending to be published from a single specific IO 
thread.
     private static final FastThreadLocal<MutableLong> pendingBytesPerThread = 
new FastThreadLocal<MutableLong>() {
         @Override
@@ -343,6 +346,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
         });
         this.service.getPulsarStats().recordConnectionClose();
+
+        // complete possible pending connection check future
+        if (connectionCheckInProgress != null && 
!connectionCheckInProgress.isDone()) {
+            connectionCheckInProgress.complete(false);
+        }
     }
 
     @Override
@@ -1222,34 +1230,35 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             CompletableFuture<Producer> existingProducerFuture = 
producers.putIfAbsent(producerId, producerFuture);
 
             if (existingProducerFuture != null) {
-                if (existingProducerFuture.isDone() && 
!existingProducerFuture.isCompletedExceptionally()) {
-                    Producer producer = existingProducerFuture.getNow(null);
-                    log.info("[{}] Producer with the same id is already 
created:"
-                            + " producerId={}, producer={}", remoteAddress, 
producerId, producer);
-                    commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
-                            producer.getSchemaVersion());
-                    return null;
-                } else {
+                if (!existingProducerFuture.isDone()) {
                     // There was an early request to create a producer with 
same producerId.
                     // This can happen when client timeout is lower than the 
broker timeouts.
                     // We need to wait until the previous producer creation 
request
                     // either complete or fails.
-                    ServerError error = null;
-                    if (!existingProducerFuture.isDone()) {
-                        error = ServerError.ServiceNotReady;
-                    } else {
-                        error = getErrorCode(existingProducerFuture);
-                        // remove producer with producerId as it's already 
completed with exception
-                        producers.remove(producerId, existingProducerFuture);
-                    }
                     log.warn("[{}][{}] Producer with id is already present on 
the connection, producerId={}",
                             remoteAddress, topicName, producerId);
-                    commandSender.sendErrorResponse(requestId, error, 
"Producer is already present on the connection");
-                    return null;
+                    commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
+                            "Producer is already present on the connection");
+                } else if (existingProducerFuture.isCompletedExceptionally()) {
+                    // remove producer with producerId as it's already 
completed with exception
+                    log.warn("[{}][{}] Producer with id is failed to register 
present on the connection, producerId={}",
+                            remoteAddress, topicName, producerId);
+                    ServerError error = getErrorCode(existingProducerFuture);
+                    producers.remove(producerId, existingProducerFuture);
+                    commandSender.sendErrorResponse(requestId, error,
+                            "Producer is already failed to register present on 
the connection");
+                } else {
+                    Producer producer = existingProducerFuture.getNow(null);
+                    log.info("[{}] [{}] Producer with the same id is already 
created:"
+                            + " producerId={}, producer={}", remoteAddress, 
topicName, producerId, producer);
+                    commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
+                            producer.getSchemaVersion());
                 }
+                return null;
             }
 
-            log.info("[{}][{}] Creating producer. producerId={}", 
remoteAddress, topicName, producerId);
+            log.info("[{}][{}] Creating producer. producerId={}, 
producerName={}, schema is {}", remoteAddress,
+                    topicName, producerId, producerName, schema == null ? 
"absent" : "present");
 
             service.getOrCreateTopic(topicName.toString()).thenCompose((Topic 
topic) -> {
                 // Before creating producer, check if backlog quota exceeded
@@ -3072,6 +3081,43 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
     }
 
+    CompletableFuture<Boolean> connectionCheckInProgress;
+
+    @Override
+    public CompletableFuture<Boolean> checkConnectionLiveness() {
+        if (connectionLivenessCheckTimeoutMillis > 0) {
+            return 
NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
+                if (connectionCheckInProgress != null) {
+                    return connectionCheckInProgress;
+                } else {
+                    final CompletableFuture<Boolean> 
finalConnectionCheckInProgress = new CompletableFuture<>();
+                    connectionCheckInProgress = finalConnectionCheckInProgress;
+                    ctx.executor().schedule(() -> {
+                        if (finalConnectionCheckInProgress == 
connectionCheckInProgress
+                                && !finalConnectionCheckInProgress.isDone()) {
+                            log.warn("[{}] Connection check timed out. Closing 
connection.", remoteAddress);
+                            ctx.close();
+                        }
+                    }, connectionLivenessCheckTimeoutMillis, 
TimeUnit.MILLISECONDS);
+                    sendPing();
+                    return finalConnectionCheckInProgress;
+                }
+            })).thenCompose(java.util.function.Function.identity());
+        } else {
+            // check is disabled
+            return CompletableFuture.completedFuture((Boolean) null);
+        }
+    }
+
+    @Override
+    protected void messageReceived() {
+        super.messageReceived();
+        if (connectionCheckInProgress != null && 
!connectionCheckInProgress.isDone()) {
+            connectionCheckInProgress.complete(true);
+            connectionCheckInProgress = null;
+        }
+    }
+
     private static void logAuthException(SocketAddress remoteAddress, String 
operation,
                                          String principal, Optional<TopicName> 
topic, Throwable ex) {
         String topicString = topic.map(t -> ", topic=" + 
t.toString()).orElse("");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index cc0c559eaf2..f28016c2de9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.util.concurrent.Promise;
 import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 
 public interface TransportCnx {
@@ -78,4 +79,12 @@ public interface TransportCnx {
 
     String clientSourceAddress();
 
+    /***
+     * Check if the connection is still alive
+     * by actively sending a Ping message to the client.
+     *
+     * @return a completable future where the result is true if the connection 
is alive, false otherwise. The result
+     * is null if the connection liveness check is disabled.
+     */
+    CompletableFuture<Boolean> checkConnectionLiveness();
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c1f75ba141d..d88a411be34 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
@@ -49,17 +50,21 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -71,6 +76,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.DataProvider;
@@ -563,5 +569,23 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
                 });
     }
 
+    protected ServiceProducer getServiceProducer(ProducerImpl clientProducer, 
String topicName) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        org.apache.pulsar.broker.service.Producer serviceProducer =
+                
persistentTopic.getProducers().get(clientProducer.getProducerName());
+        long clientProducerId = WhiteboxImpl.getInternalState(clientProducer, 
"producerId");
+        assertEquals(serviceProducer.getProducerId(), clientProducerId);
+        assertEquals(serviceProducer.getEpoch(), 
clientProducer.getConnectionHandler().getEpoch());
+        return new ServiceProducer(serviceProducer, persistentTopic);
+    }
+
+    @Data
+    @AllArgsConstructor
+    public static class ServiceProducer {
+        private org.apache.pulsar.broker.service.Producer serviceProducer;
+        private PersistentTopic persistentTopic;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index e2af5944ef0..5457a6088f2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -44,10 +43,13 @@ import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.DefaultChannelId;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.vertx.core.impl.ConcurrentHashSet;
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -63,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
@@ -96,6 +100,7 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
@@ -109,6 +114,7 @@ import 
org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -127,6 +133,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.protocol.schema.EmptyVersion;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -142,6 +149,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+@Slf4j
 @SuppressWarnings("unchecked")
 @Test(groups = "broker")
 public class ServerCnxTest {
@@ -171,25 +179,24 @@ public class ServerCnxTest {
     private ManagedCursor cursorMock;
     private OrderedExecutor executor;
     private EventLoopGroup eventLoopGroup;
+    private ConcurrentHashSet<EmbeddedChannel> 
channelsStoppedAnswerHealthCheck = new ConcurrentHashSet<>();
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
+        channelsStoppedAnswerHealthCheck.clear();
         eventLoopGroup = new NioEventLoopGroup();
         executor = OrderedExecutor.newBuilder().numThreads(1).build();
-        svcConfig = spy(ServiceConfiguration.class);
+        svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        svcConfig.setClusterName("pulsar-cluster");
-        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
-
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
         svcConfig.setBacklogQuotaCheckEnabled(false);
+        svcConfig.setClusterName("use");
+        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
+        doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
         doReturn(svcConfig).when(pulsar).getConfiguration();
         
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
 
-        doReturn("use").when(svcConfig).getClusterName();
-
         mlFactoryMock = mock(ManagedLedgerFactory.class);
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
 
@@ -876,6 +883,136 @@ public class ServerCnxTest {
                 }));
     }
 
+    @Test
+    public void testHandleProducerAfterClientChannelInactive() throws 
Exception {
+        final String tName = successTopicName;
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        setChannelConnected();
+
+        // The producer register using the first connection.
+        ByteBuf cmdProducer1 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        channel.writeInbound(cmdProducer1);
+        assertTrue(getResponse() instanceof CommandProducerSuccess);
+        PersistentTopic topicRef = (PersistentTopic) 
brokerService.getTopicReference(tName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getProducers().size(), 1);
+
+        // Verify the second producer using a new connection will override the 
producer who using a stopped channel.
+        channelsStoppedAnswerHealthCheck.add(channel);
+        ClientChannel channel2 = new ClientChannel();
+        setChannelConnected(channel2.serverCnx);
+        Awaitility.await().untilAsserted(() -> {
+            channel.runPendingTasks();
+            ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                    pName, false, metadata, null, epoch.incrementAndGet(), 
false,
+                    ProducerAccessMode.Shared, Optional.empty(), false);
+            channel2.channel.writeInbound(cmdProducer2);
+            assertTrue(getResponse(channel2.channel, 
channel2.clientChannelHelper) instanceof CommandProducerSuccess);
+            assertEquals(topicRef.getProducers().size(), 1);
+        });
+
+        // cleanup.
+        channel.finish();
+        channel2.close();
+    }
+
+    private class ClientChannel implements Closeable {
+        private ClientChannelHelper clientChannelHelper = new 
ClientChannelHelper();
+        private ServerCnx serverCnx = new ServerCnx(pulsar);
+        private EmbeddedChannel channel = new 
EmbeddedChannel(DefaultChannelId.newInstance(),
+                new LengthFieldBasedFrameDecoder(
+                        5 * 1024 * 1024,
+                        0,
+                        4,
+                        0,
+                        4),
+                serverCnx);
+        public ClientChannel() {
+            serverCnx.setAuthRole("");
+        }
+        public void close(){
+            if (channel != null && channel.isActive()) {
+                serverCnx.close();
+                channel.close();
+            }
+        }
+    }
+
+    @Test
+    public void testHandleProducer() throws Exception {
+        final String tName = "persistent://public/default/test-topic";
+        final long producerId = 1;
+        final MutableInt requestId = new MutableInt(1);
+        final MutableInt epoch = new MutableInt(1);
+        final Map<String, String> metadata = Collections.emptyMap();
+        final String pName = "p1";
+        resetChannel();
+        assertTrue(channel.isActive());
+        assertEquals(serverCnx.getState(), State.Start);
+
+        // connect.
+        ByteBuf cConnect = Commands.newConnect("none", "", null);
+        channel.writeInbound(cConnect);
+        assertEquals(serverCnx.getState(), State.Connected);
+        assertTrue(getResponse() instanceof CommandConnected);
+
+        // There is an in-progress producer registration.
+        ByteBuf cProducer1 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture1 = new CompletableFuture();
+        serverCnx.getProducers().put(producerId, existingFuture1);
+        channel.writeInbound(cProducer1);
+        Object response1 = getResponse();
+        assertTrue(response1 instanceof CommandError);
+        CommandError error1 = (CommandError) response1;
+        assertEquals(error1.getError().toString(), 
ServerError.ServiceNotReady.toString());
+        assertTrue(error1.getMessage().contains("already present on the 
connection"));
+
+        // There is a failed registration.
+        ByteBuf cProducer2 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture2 = new CompletableFuture();
+        existingFuture2.completeExceptionally(new 
BrokerServiceException.ProducerBusyException("123"));
+        serverCnx.getProducers().put(producerId, existingFuture2);
+
+        channel.writeInbound(cProducer2);
+        Object response2 = getResponse();
+        assertTrue(response2 instanceof CommandError);
+        CommandError error2 = (CommandError) response2;
+        assertEquals(error2.getError().toString(), 
ServerError.ProducerBusy.toString());
+        assertTrue(error2.getMessage().contains("already failed to register 
present on the connection"));
+
+        // There is an successful registration.
+        ByteBuf cProducer3 = Commands.newProducer(tName, producerId, 
requestId.incrementAndGet(),
+                pName, false, metadata, null, epoch.incrementAndGet(), false,
+                ProducerAccessMode.Shared, Optional.empty(), false);
+        CompletableFuture existingFuture3 = new CompletableFuture();
+        org.apache.pulsar.broker.service.Producer serviceProducer =
+                mock(org.apache.pulsar.broker.service.Producer.class);
+        when(serviceProducer.getProducerName()).thenReturn(pName);
+        when(serviceProducer.getSchemaVersion()).thenReturn(new 
EmptyVersion());
+        existingFuture3.complete(serviceProducer);
+        serverCnx.getProducers().put(producerId, existingFuture3);
+
+        channel.writeInbound(cProducer3);
+        Object response3 = getResponse();
+        assertTrue(response3 instanceof CommandProducerSuccess);
+        CommandProducerSuccess cProducerSuccess = (CommandProducerSuccess) 
response3;
+        assertEquals(cProducerSuccess.getProducerName(), pName);
+
+        // cleanup.
+        channel.finish();
+    }
+
     // This test used to be in the ServerCnxAuthorizationTest class, but it 
was migrated here because the mocking
     // in that class was too extensive. There is some overlap with this test 
and other tests in this class. The primary
     // role of this test is verifying that the correct role and 
AuthenticationDataSource are passed to the
@@ -2341,6 +2478,10 @@ public class ServerCnxTest {
     }
 
     protected void setChannelConnected() throws Exception {
+        setChannelConnected(serverCnx);
+    }
+
+    protected void setChannelConnected(ServerCnx serverCnx) throws Exception {
         Field channelState = ServerCnx.class.getDeclaredField("state");
         channelState.setAccessible(true);
         channelState.set(serverCnx, State.Connected);
@@ -2354,13 +2495,31 @@ public class ServerCnxTest {
     }
 
     protected Object getResponse() throws Exception {
+        return getResponse(channel, clientChannelHelper);
+    }
+
+    protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper 
clientChannelHelper) throws Exception {
         // Wait at most for 10s to get a response
         final long sleepTimeMs = 10;
         final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
         for (int i = 0; i < iterations; i++) {
             if (!channel.outboundMessages().isEmpty()) {
                 Object outObject = channel.outboundMessages().remove();
-                return clientChannelHelper.getCommand(outObject);
+                Object cmd = clientChannelHelper.getCommand(outObject);
+                if (cmd instanceof CommandPing) {
+                    if (channelsStoppedAnswerHealthCheck.contains(channel)) {
+                        continue;
+                    }
+                    
channel.writeAndFlush(Commands.newPong()).addListener(future -> {
+                        if (!future.isSuccess()) {
+                            log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
+                                    channel, future.cause());
+                            channel.close();
+                        }
+                    });
+                    continue;
+                }
+                return cmd;
             } else {
                 Thread.sleep(sleepTimeMs);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
new file mode 100644
index 00000000000..56b4609d66a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import java.util.concurrent.CountDownLatch;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Different with {@link 
org.apache.pulsar.client.api.SimpleProducerConsumerTest}, this class can visit 
the variables
+ * of {@link ConsumerImpl} which are modified `protected`.
+ */
+@Test(groups = "broker-api")
+public class ProducerConsumerInternalTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSameProducerRegisterTwice() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create producer using default producerName.
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer().topic(topicName).create();
+        ServiceProducer serviceProducer = getServiceProducer(producer, 
topicName);
+
+        // Remove producer maintained by server cnx. To make it can register 
the second time.
+        removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+        // Trigger the client producer reconnect.
+        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+        commandCloseProducer.setProducerId(producer.producerId);
+        producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+        // Verify the reconnection will be success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getState().toString(), "Ready");
+        });
+    }
+
+    @Test
+    public void testSameProducerRegisterTwiceWithSpecifiedProducerName() 
throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String pName = "p1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Create producer using default producerName.
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer().producerName(pName).topic(topicName).create();
+        ServiceProducer serviceProducer = getServiceProducer(producer, 
topicName);
+
+        // Remove producer maintained by server cnx. To make it can register 
the second time.
+        removeServiceProducerMaintainedByServerCnx(serviceProducer);
+
+        // Trigger the client producer reconnect.
+        CommandCloseProducer commandCloseProducer = new CommandCloseProducer();
+        commandCloseProducer.setProducerId(producer.producerId);
+        producer.getClientCnx().handleCloseProducer(commandCloseProducer);
+
+        // Verify the reconnection will be success.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(producer.getState().toString(), "Ready", "The 
producer registration failed");
+        });
+    }
+
+    private void removeServiceProducerMaintainedByServerCnx(ServiceProducer 
serviceProducer) {
+        ServerCnx serverCnx = (ServerCnx) 
serviceProducer.getServiceProducer().getCnx();
+        serverCnx.removedProducer(serviceProducer.getServiceProducer());
+        Awaitility.await().untilAsserted(() -> {
+            
assertFalse(serverCnx.getProducers().containsKey(serviceProducer.getServiceProducer().getProducerId()));
+        });
+    }
+
+    @Test
+    public void 
testExclusiveConsumerWillAlwaysRetryEvenIfReceivedConsumerBusyError() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "subscription1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        final ConsumerImpl consumer = (ConsumerImpl) 
pulsarClient.newConsumer().topic(topicName.toString())
+                
.subscriptionType(SubscriptionType.Exclusive).subscriptionName(subscriptionName).subscribe();
+
+        ClientCnx clientCnx = consumer.getClientCnx();
+        ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService()
+                
.getTopic(topicName,false).join().get().getSubscription(subscriptionName)
+                .getDispatcher().getConsumers().get(0).cnx();
+
+        // Make a disconnect to trigger broker remove the consumer which 
related this connection.
+        // Make the second subscribe runs after the broker removing the old 
consumer, then it will receive
+        // an error: "Exclusive consumer is already connected"
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        serverCnx.execute(() -> {
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        clientCnx.close();
+        Thread.sleep(1000);
+        countDownLatch.countDown();
+
+        // Verify the consumer will always retry subscribe event received 
ConsumerBusy error.
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.getState(), HandlerState.State.Ready);
+        });
+
+        // cleanup.
+        consumer.close();
+        admin.topics().delete(topicName, false);
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 3cd91809f52..8cab3742bac 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.protocol;
 
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.ScheduledFuture;
 import java.net.SocketAddress;
@@ -53,7 +54,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
     }
 
     @Override
-    protected final void messageReceived() {
+    protected void messageReceived() {
         waitingForPingResponse = false;
     }
 
@@ -117,14 +118,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
                 log.debug("[{}] Sending ping message", ctx.channel());
             }
             waitingForPingResponse = true;
-            ctx.writeAndFlush(Commands.newPing())
-                    .addListener(future -> {
-                        if (!future.isSuccess()) {
-                            log.warn("[{}] Forcing connection to close since 
cannot send a ping message.",
-                                    ctx.channel(), future.cause());
-                            ctx.close();
-                        }
-                    });
+            sendPing();
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Peer doesn't support keep-alive", 
ctx.channel());
@@ -132,6 +126,17 @@ public abstract class PulsarHandler extends PulsarDecoder {
         }
     }
 
+    protected ChannelFuture sendPing() {
+        return ctx.writeAndFlush(Commands.newPing())
+                .addListener(future -> {
+                    if (!future.isSuccess()) {
+                        log.warn("[{}] Forcing connection to close since 
cannot send a ping message.",
+                                ctx.channel(), future.cause());
+                        ctx.close();
+                    }
+                });
+    }
+
     public void cancelKeepAliveTask() {
         if (keepAliveTask != null) {
             keepAliveTask.cancel(false);


Reply via email to