poorbarcode opened a new pull request, #21144:
URL: https://github.com/apache/pulsar/pull/21144

   ### Motivation
   
   ```
   2023-09-07T09:02:53.011220026Z stdout F 2023-09-07T09:02:53,011+0000 
[pulsar-io-6-1] WARN  org.apache.pulsar.broker.service.ServerCnx - 
[/127.0.0.6:48061] Failed to add producer to topic 
persistent://public/default/tp1: producerId=167, Producer with n
   ame 'producer-b1fc76' is already connected to topic
   ```
   
   Pulsar has two mechanisms to guarantee that a producer connects to the 
broker multiple times the result is still correct.
   
   - In a connection, the second connection waits for the first connection to 
complete<sup>[1]</sup>
   - In a topic, the second connection will override the previous 
one<sup>[2]</sup>
   
   However, if a producer can use different connections to connect to the 
broker, these two mechanisms will not work.
   
   When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, 
a producer could use more than one connection<sup>[3]</sup>, leading to the 
error above.
   
   **[1]**
   
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1398-L1423
   
   ```java
   CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
   CompletableFuture<Producer> existingProducerFuture = 
producers.putIfAbsent(producerId, producerFuture);
   
   if (existingProducerFuture != null) {
       if (existingProducerFuture.isDone() && 
!existingProducerFuture.isCompletedExceptionally()) {
           Producer producer = existingProducerFuture.getNow(null);
           log.info("[{}] Producer with the same id is already created:"
                   + " producerId={}, producer={}", remoteAddress, producerId, 
producer);
           commandSender.sendProducerSuccessResponse(requestId, 
producer.getProducerName(),
                   producer.getSchemaVersion());
           return null;
       } else {
           // There was an early request to create a producer with same 
producerId.
           // This can happen when client timeout is lower than the broker 
timeouts.
           // We need to wait until the previous producer creation request
           // either complete or fails.
           ServerError error = null;
           if (!existingProducerFuture.isDone()) {
               error = ServerError.ServiceNotReady;
           } else {
               error = getErrorCode(existingProducerFuture);
               // remove producer with producerId as it's already completed 
with exception
               producers.remove(producerId, existingProducerFuture);
           }
           log.warn("[{}][{}] Producer with id is already present on the 
connection, producerId={}",
                   remoteAddress, topicName, producerId);
           commandSender.sendErrorResponse(requestId, error, "Producer is 
already present on the connection");
           return null;
       }
   }
   ```
   
   **[2]**
   
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L983-L999
   
   ```java
   private void tryOverwriteOldProducer(Producer oldProducer, Producer 
newProducer)
           throws BrokerServiceException {
       if (newProducer.isSuccessorTo(oldProducer) && 
!isUserProvidedProducerName(oldProducer)
               && !isUserProvidedProducerName(newProducer)) {
           oldProducer.close(false);
           if (!producers.replace(newProducer.getProducerName(), oldProducer, 
newProducer)) {
               // Met concurrent update, throw exception here so that client 
can try reconnect later.
               throw new BrokerServiceException.NamingException("Producer with 
name '" + newProducer.getProducerName()
                       + "' replace concurrency error");
           } else {
               handleProducerRemoved(oldProducer);
           }
       } else {
           throw new BrokerServiceException.NamingException(
                   "Producer with name '" + newProducer.getProducerName() + "' 
is already connected to topic");
       }
   }
   ```
   
   **[3]**
   
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L213-L218
   
   ```java
   final int randomKey = signSafeMod(random.nextInt(), maxConnectionsPerHosts);
   
   final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
           pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
   CompletableFuture<ClientCnx> completableFuture = innerPool
           .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, 
physicalAddress, randomKey));
   ```
   
   
   ### Modifications
   
   Make the same producer/consumer usage the same connection
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to