GitHub user Radiancebobo edited a discussion: Persistent "Producer/Consumer with id is already present on the connection" errors after broker restart
## Environment - **Pulsar Version**: 3.0.5 - **Client**: Go client v0.16.0 - **Scenario**: Partial broker restart ## Problem Description After restarting some brokers in our Pulsar cluster, we continuously encounter the following errors: ``` Consumer with id is already present on the connection Producer with id is already present on the connection ``` <img width="1028" height="487" alt="image" src="https://github.com/user-attachments/assets/ac97e171-ab6c-42b6-9fc2-79857ddb35f0" /> <img width="753" height="409" alt="image" src="https://github.com/user-attachments/assets/45be909e-be7f-4980-8df8-b346517a086a" /> These errors persist and prevent clients from creating new producers/consumers, effectively blocking all operations until the client connection is manually restarted. ## Questions ### 1. Is this a known issue? - Has this issue been reported and tracked before? - If yes, in which version was it fixed? - Are there any related PRs or issues I should reference? ### 2. Can producerFuture/consumerFuture remain incomplete indefinitely? Looking at the code in `ServerCnx.java`, I found that: **For Producer** : ```java if (!existingProducerFuture.isDone()) { // There was an early request to create a producer with same producerId. // This can happen when client timeout is lower than the broker timeouts. // We need to wait until the previous producer creation request // either complete or fails. log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}", remoteAddress, topicName, producerId); commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Producer is already present on the connection"); } ``` **For Consumer** : ```java if (!existingConsumerFuture.isDone()){ // There was an early request to create a consumer with same consumerId. This can happen // when // client timeout is lower the broker timeouts. We need to wait until the previous // consumer // creation request either complete or fails. log.warn("[{}][{}][{}] Consumer with id is already present on the connection," + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection"); } ``` **My concern**: Is it possible that `producerFuture` or `consumerFuture` could remain in an incomplete state indefinitely due to: - Network issues during broker restart - Broker crash during topic loading - Timeout in topic metadata operations - Topic loading getting stuck If these futures never complete, clients would be permanently stuck receiving these errors. ## Proposed Solution To prevent futures from remaining incomplete indefinitely, I propose adding a **timeout mechanism** similar to what many async operations have. Here's my proposed implementation: ### 1. Add timeout constants ```java private static final long PRODUCER_CREATION_TIMEOUT_SECONDS = 60; private static final long CONSUMER_CREATION_TIMEOUT_SECONDS = 60; ``` ### 2. Create timeout scheduling methods **For Producer:** ```java private ScheduledFuture<?> scheduleProducerCreationTimeout(long producerId, CompletableFuture<Producer> producerFuture, long requestId, TopicName topicName) { return ctx.executor().schedule(() -> { if (producerFuture.isDone()) { return; } TimeoutException timeoutException = new TimeoutException( String.format("Timed out creating producer after %d seconds: topic=%s, producerId=%d", PRODUCER_CREATION_TIMEOUT_SECONDS, topicName, producerId)); log.warn("[{}][{}] Producer creation timed out, closing connection: producerId={}", remoteAddress, topicName, producerId); if (producerFuture.completeExceptionally(timeoutException)) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, timeoutException.getMessage()); } producers.remove(producerId, producerFuture); close(); }, PRODUCER_CREATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); } ``` **For Consumer:** ```java private ScheduledFuture<?> scheduleConsumerCreationTimeout(long consumerId, CompletableFuture<Consumer> consumerFuture, long requestId, TopicName topicName, String subscriptionName) { return ctx.executor().schedule(() -> { if (consumerFuture.isDone()) { return; } TimeoutException timeoutException = new TimeoutException( String.format("Timed out creating consumer after %d seconds: topic=%s, subscription=%s, consumerId=%d", CONSUMER_CREATION_TIMEOUT_SECONDS, topicName, subscriptionName, consumerId)); log.warn("[{}][{}][{}] Consumer creation timed out, closing connection: consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); if (consumerFuture.completeExceptionally(timeoutException)) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, timeoutException.getMessage()); } consumers.remove(consumerId, consumerFuture); close(); }, CONSUMER_CREATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); } ``` ### 3. Schedule timeout when creating producer/consumer **In handleProducer():** ```java ScheduledFuture<?> producerCreationTimeout = scheduleProducerCreationTimeout( producerId, producerFuture, requestId, topicName); producerFuture.whenComplete((__, ___) -> producerCreationTimeout.cancel(false)); ``` **In handleSubscribe():** ```java ScheduledFuture<?> consumerCreationTimeout = scheduleConsumerCreationTimeout( consumerId, consumerFuture, requestId, topicName, subscriptionName); consumerFuture.whenComplete((__, ___) -> consumerCreationTimeout.cancel(false)); ``` ### Benefits of this approach: 1. **Prevents resource leaks**: Futures that never complete will be cleaned up after timeout 2. **Client-friendly**: Clients receive clear timeout error messages instead of being stuck 3. **Enables recovery**: After timeout and connection close, clients can reconnect and retry 4. **Consistent behavior**: Both producer and consumer creation have symmetric timeout protection 5. **Minimal overhead**: Timeout task is cancelled immediately when future completes normally ## Questions 1. **Is this issue already addressed in a newer version?** If so, which version should we upgrade to? 2. **Is my analysis correct** that producer/consumer futures could potentially remain incomplete indefinitely? 3. **Would this timeout approach be acceptable** for inclusion in the codebase? Are there any concerns or alternative approaches you'd recommend? 4. **What should the timeout value be?** I proposed 60 seconds, but this could be configurable. What would be a reasonable default? Thank you for your time and consideration! GitHub link: https://github.com/apache/pulsar/discussions/25025 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
