codelipenghui commented on code in PR #21220:
URL: https://github.com/apache/pulsar/pull/21220#discussion_r1378596083


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer) 
throws BrokerServiceExcept
 
         Producer existProducer = 
producers.putIfAbsent(producer.getProducerName(), producer);
         if (existProducer != null) {
-            tryOverwriteOldProducer(existProducer, producer);
+            return tryOverwriteOldProducer(existProducer, producer);
         } else if (!producer.isRemote()) {
             USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
         }
+        return CompletableFuture.completedFuture(null);
     }
 
-    private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
-            throws BrokerServiceException {
+    private CompletableFuture<Void> tryOverwriteOldProducer(Producer 
oldProducer, Producer newProducer) {
         if (newProducer.isSuccessorTo(oldProducer)) {
             oldProducer.close(false);
             if (!producers.replace(newProducer.getProducerName(), oldProducer, 
newProducer)) {
                 // Met concurrent update, throw exception here so that client 
can try reconnect later.
-                throw new BrokerServiceException.NamingException("Producer 
with name '" + newProducer.getProducerName()
-                        + "' replace concurrency error");
+                return CompletableFuture.failedFuture(new 
BrokerServiceException.NamingException("Producer with name '"
+                        + newProducer.getProducerName() + "' replace 
concurrency error"));
             } else {
                 handleProducerRemoved(oldProducer);
+                return CompletableFuture.completedFuture(null);
             }
         } else {
             // If a producer with the same name tries to use a new connection, 
async check the old connection is
             // available. The producers related the connection that not 
available are automatically cleaned up.
             if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
-                oldProducer.getCnx().checkConnectionLiveness();
+                return 
oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
+                    if (previousIsActive) {
+                        return CompletableFuture.failedFuture(new 
BrokerServiceException.NamingException(
+                                "Producer with name '" + 
newProducer.getProducerName()
+                                        + "' is already connected to topic"));
+                    } else {
+                        return internalAddProducer(newProducer);

Review Comment:
   It's better to add a comment here to explain why we should take a recursive 
call.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -974,31 +975,40 @@ protected void internalAddProducer(Producer producer) 
throws BrokerServiceExcept
 
         Producer existProducer = 
producers.putIfAbsent(producer.getProducerName(), producer);
         if (existProducer != null) {
-            tryOverwriteOldProducer(existProducer, producer);
+            return tryOverwriteOldProducer(existProducer, producer);
         } else if (!producer.isRemote()) {
             USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
         }
+        return CompletableFuture.completedFuture(null);
     }
 
-    private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
-            throws BrokerServiceException {
+    private CompletableFuture<Void> tryOverwriteOldProducer(Producer 
oldProducer, Producer newProducer) {
         if (newProducer.isSuccessorTo(oldProducer)) {
             oldProducer.close(false);
             if (!producers.replace(newProducer.getProducerName(), oldProducer, 
newProducer)) {
                 // Met concurrent update, throw exception here so that client 
can try reconnect later.
-                throw new BrokerServiceException.NamingException("Producer 
with name '" + newProducer.getProducerName()
-                        + "' replace concurrency error");
+                return CompletableFuture.failedFuture(new 
BrokerServiceException.NamingException("Producer with name '"
+                        + newProducer.getProducerName() + "' replace 
concurrency error"));
             } else {
                 handleProducerRemoved(oldProducer);
+                return CompletableFuture.completedFuture(null);
             }
         } else {
             // If a producer with the same name tries to use a new connection, 
async check the old connection is
             // available. The producers related the connection that not 
available are automatically cleaned up.
             if (!Objects.equals(oldProducer.getCnx(), newProducer.getCnx())) {
-                oldProducer.getCnx().checkConnectionLiveness();
+                return 
oldProducer.getCnx().checkConnectionLiveness().thenCompose(previousIsActive -> {
+                    if (previousIsActive) {
+                        return CompletableFuture.failedFuture(new 
BrokerServiceException.NamingException(
+                                "Producer with name '" + 
newProducer.getProducerName()
+                                        + "' is already connected to topic"));
+                    } else {
+                        return internalAddProducer(newProducer);

Review Comment:
   And it also should be BUG before, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to