mattisonchao edited a comment on pull request #11058: URL: https://github.com/apache/pulsar/pull/11058#issuecomment-868175558
@lhotari ### Background This test ensures that consumers who send subscription requests concurrently will return ServerError. Please see below for details: ```java ServerCnx 949:980 CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>(); CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture); if (existingConsumerFuture != null) { if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { //... Omit some code commandSender.sendSuccessResponse(requestId); return null; } else { // There was an early request to create a consumer with same consumerId. This can happen // when // client timeout is lower the broker timeouts. We need to wait until the previous // consumer // creation request either complete or fails. // ... Omit some code if (!existingConsumerFuture.isDone()) { error = ServerError.ServiceNotReady; } else { error = getErrorCode(existingConsumerFuture); consumers.remove(consumerId, existingConsumerFuture); } commandSender.sendErrorResponse(requestId, error, "Consumer is already present on the connection"); return null; } ``` ### Cause The first subscription request has been sent, but the next subscription request is delayed (may be due to network delays or other reasons.). So the first request may continue to run until it completes. ```java ServerCnx 1020:1041 service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { // .... Omit some code }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { // the consumer complete log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, subscriptionName); commandSender.sendSuccessResponse(requestId); } else { // The consumer future was completed before by a close command try { consumer.close(); log.info("[{}] Cleared consumer created after timeout on client side {}", remoteAddress, consumer); } catch (BrokerServiceException e) { log.warn( "[{}] Error closing consumer created" + " after timeout on client side {}: {}", remoteAddress, consumer, e.getMessage()); } consumers.remove(consumerId, consumerFuture); } }) ``` Therefore, when the second request is received, it will be judged as another situation. ```java ServerCnx 953:961 if (existingConsumerFuture != null) { if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { Consumer consumer = existingConsumerFuture.getNow(null); log.info("[{}] Consumer with the same id is already created:" + " consumerId={}, consumer={}", remoteAddress, consumerId, consumer); commandSender.sendSuccessResponse(requestId); return null; } else { // ...Omit some code } } ``` And then, the test will fail. due to Success response. ### Solution Reducing the time gap between sending requests can optimize the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org