This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e4c3ec  PLSR-1456: Fix race condition on producer/consumer maps in 
ServerCnx (#9256)
1e4c3ec is described below

commit 1e4c3ec4f55ad0b0729f2849915f6b4d9e426bb1
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Jan 21 22:52:45 2021 +0100

    PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)
    
    ServerCnx had a callback that was called from Producer/Consumer which
    would remove the producer/consumer from its map using only the
    ID. However, it is possible that this callback runs when the
    producer/consumer had already been removed from the map and another
    producer/consumer added in its place.
    
    The solution is to use both the key and value when removing from the
    map.
    
    The change also updates the log messages to include the producerId and
    consumerId in a format that all log messages for an individual
    producerId/consumerId can be easier found.
    
    A test has been changed because the test was depending on the broken
    behaviour. What was happening was that the fail topic producer was
    failing to create a producer, and when this happened it removed the
    producer future for the successful producer. Then, when the third
    producer tries to connect, it sees manages to create the producer on
    the connection, but fails as there is already a producer with that
    name on the topic. The correct behaviour is that it should see the
    successful producer future for that ID and respond with success.
    
    Co-authored-by: Ivan Kelly <[email protected]>
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 124 +++++++++++++--------
 .../pulsar/broker/service/ServerCnxTest.java       |   6 +-
 2 files changed, 79 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aea1f6b..8a4d5a9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -901,8 +901,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         if (existingConsumerFuture != null) {
                             if (existingConsumerFuture.isDone() && 
!existingConsumerFuture.isCompletedExceptionally()) {
                                 Consumer consumer = 
existingConsumerFuture.getNow(null);
-                                log.info("[{}] Consumer with the same id {} is 
already created: {}", remoteAddress,
-                                        consumerId, consumer);
+                                log.info("[{}] Consumer with the same id is 
already created:"
+                                         + " consumerId={}, consumer={}",
+                                         remoteAddress, consumerId, consumer);
                                 commandSender.sendSuccessResponse(requestId);
                                 return null;
                             } else {
@@ -911,14 +912,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 // client timeout is lower the broker 
timeouts. We need to wait until the previous
                                 // consumer
                                 // creation request either complete or fails.
-                                log.warn("[{}][{}][{}] Consumer with id {} is 
already present on the connection",
-                                        remoteAddress, topicName, 
subscriptionName, consumerId);
+                                log.warn("[{}][{}][{}] Consumer with id is 
already present on the connection,"
+                                         + " consumerId={}", remoteAddress, 
topicName, subscriptionName, consumerId);
                                 ServerError error = null;
                                 if (!existingConsumerFuture.isDone()) {
                                     error = ServerError.ServiceNotReady;
                                 } else {
                                     error = 
getErrorCode(existingConsumerFuture);
-                                    consumers.remove(consumerId);
+                                    consumers.remove(consumerId, 
existingConsumerFuture);
                                 }
                                 commandSender.sendErrorResponse(requestId, 
error,
                                         "Consumer is already present on the 
connection");
@@ -995,11 +996,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                                     
exception.getCause().getMessage());
                                         }
                                     } else if (exception.getCause() instanceof 
BrokerServiceException) {
-                                        log.warn("[{}][{}][{}] Failed to 
create consumer: {}", remoteAddress, topicName,
-                                                subscriptionName, 
exception.getCause().getMessage());
+                                        log.warn("[{}][{}][{}] Failed to 
create consumer: consumerId={}, {}",
+                                                 remoteAddress, topicName, 
subscriptionName,
+                                                 consumerId,  
exception.getCause().getMessage());
                                     } else {
-                                        log.warn("[{}][{}][{}] Failed to 
create consumer: {}", remoteAddress, topicName,
-                                                subscriptionName, 
exception.getCause().getMessage(), exception);
+                                        log.warn("[{}][{}][{}] Failed to 
create consumer: consumerId={}, {}",
+                                                 remoteAddress, topicName, 
subscriptionName,
+                                                 consumerId, 
exception.getCause().getMessage(), exception);
                                     }
 
                                     // If client timed out, the future would 
have been completed by subsequent close.
@@ -1095,10 +1098,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         if (existingProducerFuture != null) {
                             if (existingProducerFuture.isDone() && 
!existingProducerFuture.isCompletedExceptionally()) {
                                 Producer producer = 
existingProducerFuture.getNow(null);
-                                log.info("[{}] Producer with the same id {} is 
already created: {}", remoteAddress,
-                                        producerId, producer);
+                                log.info("[{}] Producer with the same id is 
already created:"
+                                         + " producerId={}, producer={}", 
remoteAddress, producerId, producer);
                                 
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
                                         producer.getSchemaVersion());
+
                                 return null;
                             } else {
                                 // There was an early request to create a 
producer with
@@ -1114,12 +1118,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 } else {
                                     error = 
getErrorCode(existingProducerFuture);
                                     // remove producer with producerId as it's 
already completed with exception
-                                    producers.remove(producerId);
+                                    producers.remove(producerId, 
existingProducerFuture);
                                 }
-                                log.warn("[{}][{}] Producer with id {} is 
already present on the connection",
-                                        remoteAddress, producerId, topicName);
+                                log.warn("[{}][{}] Producer with id is already 
present on the connection,"
+                                         + " producerId={}", remoteAddress, 
topicName, producerId);
                                 commandSender.sendErrorResponse(requestId, 
error,
-                                        "Producer is already present on the 
connection");
+                                                                "Producer is 
already present on the connection");
+
                                 return null;
                             }
                         }
@@ -1201,8 +1206,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                                         producers.remove(producerId, 
producerFuture);
                                 }).exceptionally(ex -> {
-                                    log.warn("[{}] Failed to add producer {}: 
{}",
-                                            remoteAddress, producer, 
ex.getMessage());
+                                    log.error("[{}] Failed to add producer to 
topic {}: producerId={}, {}",
+                                              remoteAddress, topicName, 
producerId, ex.getMessage());
+
                                     producer.closeNow(true);
                                     if 
(producerFuture.completeExceptionally(ex)) {
                                         
commandSender.sendErrorResponse(requestId,
@@ -1231,7 +1237,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                             if (!(cause instanceof 
ServiceUnitNotReadyException)) {
                                 // Do not print stack traces for expected 
exceptions
-                                log.error("[{}] Failed to create topic {}", 
remoteAddress, topicName, exception);
+                                log.error("[{}] Failed to create topic {}, 
producerId={}",
+                                          remoteAddress, topicName, 
producerId, exception);
                             }
 
                             // If client timed out, the future would have been 
completed
@@ -1474,7 +1481,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         CompletableFuture<Producer> producerFuture = producers.get(producerId);
         if (producerFuture == null) {
-            log.warn("[{}] Producer {} was not registered on the connection", 
remoteAddress, producerId);
+            log.warn("[{}] Producer was not registered on the connection. 
producerId={}", remoteAddress, producerId);
             commandSender.sendErrorResponse(requestId, 
ServerError.UnknownError,
                     "Producer was not registered on the connection");
             return;
@@ -1484,12 +1491,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 .completeExceptionally(new IllegalStateException("Closed 
producer before creation was complete"))) {
             // We have received a request to close the producer before it was 
actually completed, we have marked the
             // producer future as failed and we can tell the client the close 
operation was successful.
-            log.info("[{}] Closed producer {} before its creation was 
completed", remoteAddress, producerId);
+            log.info("[{}] Closed producer before its creation was completed. 
producerId={}",
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
             return;
         } else if (producerFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed producer {} that already failed to be 
created", remoteAddress, producerId);
+            log.info("[{}] Closed producer that already failed to be created. 
producerId={}",
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
             return;
@@ -1497,11 +1506,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         // Proceed with normal close, the producer
         Producer producer = producerFuture.getNow(null);
-        log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), 
producer.getProducerName(), remoteAddress);
+        log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
+                 producer.getTopic(), producer.getProducerName(), 
remoteAddress, producerId);
 
         producer.close(true).thenAccept(v -> {
-            log.info("[{}][{}] Closed producer on cnx {}", 
producer.getTopic(), producer.getProducerName(),
-                    remoteAddress);
+            log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
+                     producer.getTopic(), producer.getProducerName(),
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
         });
@@ -1510,14 +1521,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         checkArgument(state == State.Connected);
-        log.info("[{}] Closing consumer: {}", remoteAddress, 
closeConsumer.getConsumerId());
+        log.info("[{}] Closing consumer: consumerId={}", remoteAddress, 
closeConsumer.getConsumerId());
 
         long requestId = closeConsumer.getRequestId();
         long consumerId = closeConsumer.getConsumerId();
 
         CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
         if (consumerFuture == null) {
-            log.warn("[{}] Consumer was not registered on the connection: {}", 
consumerId, remoteAddress);
+            log.warn("[{}] Consumer was not registered on the connection: 
consumerId={}", remoteAddress, consumerId);
             commandSender.sendErrorResponse(requestId, 
ServerError.MetadataError, "Consumer not found");
             return;
         }
@@ -1527,13 +1538,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             // We have received a request to close the consumer before it was 
actually completed, we have marked the
             // consumer future as failed and we can tell the client the close 
operation was successful. When the actual
             // create operation will complete, the new consumer will be 
discarded.
-            log.info("[{}] Closed consumer {} before its creation was 
completed", remoteAddress, consumerId);
+            log.info("[{}] Closed consumer before its creation was completed. 
consumerId={}",
+                     remoteAddress, consumerId);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
 
         if (consumerFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed consumer {} that already failed to be 
created", remoteAddress, consumerId);
+            log.info("[{}] Closed consumer that already failed to be created. 
consumerId={}",
+                     remoteAddress, consumerId);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
@@ -1544,7 +1557,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             consumer.close();
             consumers.remove(consumerId, consumerFuture);
             commandSender.sendSuccessResponse(requestId);
-            log.info("[{}] Closed consumer {}", remoteAddress, consumer);
+            log.info("[{}] Closed consumer, consumerId={}", remoteAddress, 
consumerId);
         } catch (BrokerServiceException e) {
             log.warn("[{]] Error closing consumer {} : {}", remoteAddress, 
consumer, e);
             commandSender.sendErrorResponse(requestId, 
BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -1943,13 +1956,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     public void closeProducer(Producer producer) {
         // removes producer-connection from map and send close command to 
producer
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed producer: {}", remoteAddress, producer);
-        }
-        long producerId = producer.getProducerId();
-        producers.remove(producerId);
+        safelyRemoveProducer(producer);
         if (remoteEndpointProtocolVersion >= v5.getValue()) {
-            ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
+            
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
         } else {
             close();
         }
@@ -1959,13 +1968,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     public void closeConsumer(Consumer consumer) {
         // removes consumer-connection from map and send close command to 
consumer
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
-        }
-        long consumerId = consumer.consumerId();
-        consumers.remove(consumerId);
+        safelyRemoveConsumer(consumer);
         if (remoteEndpointProtocolVersion >= v5.getValue()) {
-            ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
+            ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), 
-1L));
         } else {
             close();
         }
@@ -1986,19 +1991,42 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     public void removedConsumer(Consumer consumer) {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
-        }
-
-        consumers.remove(consumer.consumerId());
+        safelyRemoveConsumer(consumer);
     }
 
     @Override
     public void removedProducer(Producer producer) {
+        safelyRemoveProducer(producer);
+    }
+
+    private void safelyRemoveProducer(Producer producer) {
+        long producerId = producer.getProducerId();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed producer: {}", remoteAddress, producer);
+            log.debug("[{}] Removed producer: producerId={}, producer={}", 
remoteAddress, producerId, producer);
+        }
+        CompletableFuture<Producer> future = producers.get(producerId);
+        if (future != null) {
+            future.whenComplete((producer2, exception) -> {
+                    if (exception != null || producer2 == producer) {
+                        producers.remove(producerId, future);
+                    }
+                });
+        }
+    }
+
+    private void safelyRemoveConsumer(Consumer consumer) {
+        long consumerId = consumer.consumerId();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", 
remoteAddress, consumerId, consumer);
+        }
+        CompletableFuture<Consumer> future = consumers.get(consumerId);
+        if (future != null) {
+            future.whenComplete((consumer2, exception) -> {
+                    if (exception != null || consumer2 == consumer) {
+                        consumers.remove(consumerId, future);
+                    }
+                });
         }
-        producers.remove(producer.getProducerId());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 7ff0c14..4bdbacb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -954,10 +954,10 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap());
         channel.writeInbound(createProducer3);
 
-        // 3rd producer fails because 2nd is already connected
+        // 3rd producer succeeds because 2nd is already connected
         response = getResponse();
-        assertEquals(response.getClass(), CommandError.class);
-        assertEquals(((CommandError) response).getRequestId(), 4);
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
 
         Thread.sleep(500);
 

Reply via email to