codelipenghui commented on code in PR #22283:
URL: https://github.com/apache/pulsar/pull/22283#discussion_r1534887661


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                         commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
                                 "Consumer is already present on the 
connection");
                     } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+                        log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
                         ServerError error = 
getErrorCodeWithErrorLog(existingConsumerFuture, true,
-                                String.format("Consumer subscribe failure. 
remoteAddress: %s, subscription: %s",
+                                String.format("A failed consumer with id is 
already present on the connection."
+                                                + " remoteAddress: %s, 
subscription: %s",
                                         remoteAddress, subscriptionName));
-                        consumers.remove(consumerId, existingConsumerFuture);
+                        /**
+                         * This feature may was failed due to the client 
closed a in-progress subscribing.
+                         * See {@link 
#handleCloseConsumer(CommandCloseConsumer)}
+                         * Do not remove the failed feature at current line, 
it will be removed after the progress of

Review Comment:
   ```suggestion
                            * Do not remove the future feature at current line, 
it will be removed after the progress of
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                         commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
                                 "Consumer is already present on the 
connection");
                     } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+                        log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
                         ServerError error = 
getErrorCodeWithErrorLog(existingConsumerFuture, true,
-                                String.format("Consumer subscribe failure. 
remoteAddress: %s, subscription: %s",
+                                String.format("A failed consumer with id is 
already present on the connection."
+                                                + " remoteAddress: %s, 
subscription: %s",
                                         remoteAddress, subscriptionName));
-                        consumers.remove(consumerId, existingConsumerFuture);
+                        /**
+                         * This feature may was failed due to the client 
closed a in-progress subscribing.
+                         * See {@link 
#handleCloseConsumer(CommandCloseConsumer)}
+                         * Do not remove the failed feature at current line, 
it will be removed after the progress of
+                         * the previous subscribing is done.
+                         * Before the previous subscribing is done, the new 
subscribe request will always fail.
+                         * This mechanism is in order to prevent more complex 
logic to handle the race conditions.
+                         */
                         commandSender.sendErrorResponse(requestId, error,
-                                "Consumer that failed is already present on 
the connection");
+                                "A failed consumer is already present on the 
connection");

Review Comment:
   Hmm, It looks like we don't need to change it.
   
   A failed consumer can be understand as a different consumer
   But if say `Consumer that ...`, it usually means the same consumer



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1224,12 +1224,22 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                         commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
                                 "Consumer is already present on the 
connection");
                     } else if 
(existingConsumerFuture.isCompletedExceptionally()){
+                        log.warn("[{}][{}][{}] A failed consumer with id is 
already present on the connection,"
+                                + " consumerId={}", remoteAddress, topicName, 
subscriptionName, consumerId);
                         ServerError error = 
getErrorCodeWithErrorLog(existingConsumerFuture, true,
-                                String.format("Consumer subscribe failure. 
remoteAddress: %s, subscription: %s",
+                                String.format("A failed consumer with id is 
already present on the connection."
+                                                + " remoteAddress: %s, 
subscription: %s",
                                         remoteAddress, subscriptionName));
-                        consumers.remove(consumerId, existingConsumerFuture);
+                        /**
+                         * This feature may was failed due to the client 
closed a in-progress subscribing.

Review Comment:
   ```suggestion
                            * This future may was failed due to the client 
closed a in-progress subscribing.
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -190,9 +190,15 @@ public synchronized CompletableFuture<Void> 
addConsumer(Consumer consumer) {
         }
 
         if (isConsumersExceededOnSubscription()) {
-            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit {}",
+                    name, consumer);
             return FutureUtil.failedFuture(new 
ConsumerBusyException("Subscription reached max consumers limit"));
         }
+        if (consumerSet.contains(consumer)) {

Review Comment:
   @poorbarcode Pease help write some comments about this change to say this is 
not expected. And I think in this case, close the dispatcher or topic might a 
better solution, just to let the consumers to reconnect again, because we don't 
think what exactly happened at that time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to