poorbarcode opened a new pull request, #21144: URL: https://github.com/apache/pulsar/pull/21144
### Motivation ``` 2023-09-07T09:02:53.011220026Z stdout F 2023-09-07T09:02:53,011+0000 [pulsar-io-6-1] WARN org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:48061] Failed to add producer to topic persistent://public/default/tp1: producerId=167, Producer with n ame 'producer-b1fc76' is already connected to topic ``` Pulsar has two mechanisms to guarantee that a producer connects to the broker multiple times the result is still correct. - In a connection, the second connection waits for the first connection to complete<sup>[1]</sup> - In a topic, the second connection will override the previous one<sup>[2]</sup> However, if a producer can use different connections to connect to the broker, these two mechanisms will not work. When the config `connectionsPerBroker` of `PulsarClient` is larger than `1`, a producer could use more than one connection<sup>[3]</sup>, leading to the error above. **[1]** https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1398-L1423 ```java CompletableFuture<Producer> producerFuture = new CompletableFuture<>(); CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture); if (existingProducerFuture != null) { if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) { Producer producer = existingProducerFuture.getNow(null); log.info("[{}] Producer with the same id is already created:" + " producerId={}, producer={}", remoteAddress, producerId, producer); commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), producer.getSchemaVersion()); return null; } else { // There was an early request to create a producer with same producerId. // This can happen when client timeout is lower than the broker timeouts. // We need to wait until the previous producer creation request // either complete or fails. ServerError error = null; if (!existingProducerFuture.isDone()) { error = ServerError.ServiceNotReady; } else { error = getErrorCode(existingProducerFuture); // remove producer with producerId as it's already completed with exception producers.remove(producerId, existingProducerFuture); } log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}", remoteAddress, topicName, producerId); commandSender.sendErrorResponse(requestId, error, "Producer is already present on the connection"); return null; } } ``` **[2]** https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L983-L999 ```java private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) throws BrokerServiceException { if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer) && !isUserProvidedProducerName(newProducer)) { oldProducer.close(false); if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { // Met concurrent update, throw exception here so that client can try reconnect later. throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() + "' replace concurrency error"); } else { handleProducerRemoved(oldProducer); } } else { throw new BrokerServiceException.NamingException( "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"); } } ``` **[3]** https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L213-L218 ```java final int randomKey = signSafeMod(random.nextInt(), maxConnectionsPerHosts); final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool = pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>()); CompletableFuture<ClientCnx> completableFuture = innerPool .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey)); ``` ### Modifications Make the same producer/consumer usage the same connection ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
