mattisonchao edited a comment on pull request #11058:
URL: https://github.com/apache/pulsar/pull/11058#issuecomment-868175558


   @lhotari
   ### Background
   
   This test ensures that consumers who send subscription requests concurrently 
will return ServerError. Please see below for details:
   ```java
   ServerCnx 949:980
   
                           CompletableFuture<Consumer> consumerFuture = new 
CompletableFuture<>();
                           CompletableFuture<Consumer> existingConsumerFuture = 
consumers.putIfAbsent(consumerId,
                                   consumerFuture);
                           if (existingConsumerFuture != null) {
                               if (existingConsumerFuture.isDone() && 
!existingConsumerFuture.isCompletedExceptionally()) {
                                   //... Omit some code
                                   commandSender.sendSuccessResponse(requestId);
                                   return null;
                               } else {
                                   // There was an early request to create a 
consumer with same consumerId. This can happen
                                   // when
                                   // client timeout is lower the broker 
timeouts. We need to wait until the previous
                                   // consumer
                                   // creation request either complete or fails.
                                   // ... Omit some code
                                   if (!existingConsumerFuture.isDone()) {
                                       error = ServerError.ServiceNotReady;
                                   } else {
                                       error = 
getErrorCode(existingConsumerFuture);
                                       consumers.remove(consumerId, 
existingConsumerFuture);
                                   }
                                   commandSender.sendErrorResponse(requestId, 
error,
                                           "Consumer is already present on the 
connection");
                                   return null;
                               }                 
   ```
   ### Cause
   
   The first subscription request has been sent, but the next subscription 
request is delayed (may be due to network delays or other reasons.). So the 
first request may continue to run until it completes.
   ```java
   ServerCnx  1020:1041
   
                             service.getTopic(topicName.toString(), 
createTopicIfDoesNotExist)
                                   .thenCompose(optTopic -> {
                                     // .... Omit some code
                                   })
                                    .thenAccept(consumer -> {
                                       if (consumerFuture.complete(consumer)) { 
    // the consumer complete
                                           log.info("[{}] Created subscription 
on topic {} / {}",
                                                   remoteAddress, topicName, 
subscriptionName);
                                           
commandSender.sendSuccessResponse(requestId);
                                       } else {
                                           // The consumer future was completed 
before by a close command
                                           try {
                                               consumer.close();
                                               log.info("[{}] Cleared consumer 
created after timeout on client side {}",
                                                       remoteAddress, consumer);
                                           } catch (BrokerServiceException e) {
                                               log.warn(
                                                       "[{}] Error closing 
consumer created"
                                                               + " after 
timeout on client side {}: {}",
                                                       remoteAddress, consumer, 
e.getMessage());
                                           }
                                           consumers.remove(consumerId, 
consumerFuture);
                                       }
                                   })
   
   ```
   Therefore, when the second request is received, it will be judged as another 
situation.
   
   ```java
   ServerCnx  953:961
   
                    if (existingConsumerFuture != null) {
                               if (existingConsumerFuture.isDone() && 
!existingConsumerFuture.isCompletedExceptionally()) {
                                   Consumer consumer = 
existingConsumerFuture.getNow(null);
                                   log.info("[{}] Consumer with the same id is 
already created:"
                                            + " consumerId={}, consumer={}",
                                            remoteAddress, consumerId, 
consumer);
                                   commandSender.sendSuccessResponse(requestId);
                                   return null;
                               } else {
                     //  ...Omit some code
                                }
                     }
   ```
   And then, the test will fail. due to Success response.
   
   ### Solution
   
   Reducing the time gap between sending requests can optimize the test.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to