GitHub user Radiancebobo created a discussion: Persistent "Producer/Consumer
with id is already present on the connection" errors after broker restart
## Environment
- **Pulsar Version**: 3.0.5
- **Client**: Go client v0.16.0
- **Scenario**: Partial broker restart
## Problem Description
After restarting some brokers in our Pulsar cluster, we continuously encounter
the following errors:
```
Consumer with id is already present on the connection
Producer with id is already present on the connection
```
These errors persist and prevent clients from creating new producers/consumers,
effectively blocking all operations until the client connection is manually
restarted.
## Questions
### 1. Is this a known issue?
- Has this issue been reported and tracked before?
- If yes, in which version was it fixed?
- Are there any related PRs or issues I should reference?
### 2. Can producerFuture/consumerFuture remain incomplete indefinitely?
Looking at the code in `ServerCnx.java`, I found that:
**For Producer** :
```java
if (!existingProducerFuture.isDone()) {
// There was an early request to create a producer with same producerId.
// This can happen when client timeout is lower than the broker timeouts.
// We need to wait until the previous producer creation request
// either complete or fails.
log.warn("[{}][{}] Producer with id is already present on the connection,
producerId={}",
remoteAddress, topicName, producerId);
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Producer is already present on the connection");
}
```
**For Consumer** :
```java
if (!existingConsumerFuture.isDone()){
// There was an early request to create a consumer with same consumerId.
This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the
previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the
connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName,
consumerId);
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection");
}
```
**My concern**: Is it possible that `producerFuture` or `consumerFuture` could
remain in an incomplete state indefinitely due to:
- Network issues during broker restart
- Broker crash during topic loading
- Timeout in topic metadata operations
- Topic loading getting stuck
If these futures never complete, clients would be permanently stuck receiving
these errors.
## Proposed Solution
To prevent futures from remaining incomplete indefinitely, I propose adding a
**timeout mechanism** similar to what many async operations have. Here's my
proposed implementation:
### 1. Add timeout constants
```java
private static final long PRODUCER_CREATION_TIMEOUT_SECONDS = 60;
private static final long CONSUMER_CREATION_TIMEOUT_SECONDS = 60;
```
### 2. Create timeout scheduling methods
**For Producer:**
```java
private ScheduledFuture<?> scheduleProducerCreationTimeout(long producerId,
CompletableFuture<Producer> producerFuture,
long requestId,
TopicName topicName)
{
return ctx.executor().schedule(() -> {
if (producerFuture.isDone()) {
return;
}
TimeoutException timeoutException = new TimeoutException(
String.format("Timed out creating producer after %d seconds:
topic=%s, producerId=%d",
PRODUCER_CREATION_TIMEOUT_SECONDS, topicName,
producerId));
log.warn("[{}][{}] Producer creation timed out, closing connection:
producerId={}",
remoteAddress, topicName, producerId);
if (producerFuture.completeExceptionally(timeoutException)) {
commandSender.sendErrorResponse(requestId,
ServerError.ServiceNotReady,
timeoutException.getMessage());
}
producers.remove(producerId, producerFuture);
close();
}, PRODUCER_CREATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
```
**For Consumer:**
```java
private ScheduledFuture<?> scheduleConsumerCreationTimeout(long consumerId,
CompletableFuture<Consumer> consumerFuture,
long requestId,
TopicName topicName,
String
subscriptionName) {
return ctx.executor().schedule(() -> {
if (consumerFuture.isDone()) {
return;
}
TimeoutException timeoutException = new TimeoutException(
String.format("Timed out creating consumer after %d seconds:
topic=%s, subscription=%s, consumerId=%d",
CONSUMER_CREATION_TIMEOUT_SECONDS, topicName,
subscriptionName, consumerId));
log.warn("[{}][{}][{}] Consumer creation timed out, closing connection:
consumerId={}",
remoteAddress, topicName, subscriptionName, consumerId);
if (consumerFuture.completeExceptionally(timeoutException)) {
commandSender.sendErrorResponse(requestId,
ServerError.ServiceNotReady,
timeoutException.getMessage());
}
consumers.remove(consumerId, consumerFuture);
close();
}, CONSUMER_CREATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
```
### 3. Schedule timeout when creating producer/consumer
**In handleProducer():**
```java
ScheduledFuture<?> producerCreationTimeout = scheduleProducerCreationTimeout(
producerId, producerFuture, requestId, topicName);
producerFuture.whenComplete((__, ___) -> producerCreationTimeout.cancel(false));
```
**In handleSubscribe():**
```java
ScheduledFuture<?> consumerCreationTimeout = scheduleConsumerCreationTimeout(
consumerId, consumerFuture, requestId, topicName, subscriptionName);
consumerFuture.whenComplete((__, ___) -> consumerCreationTimeout.cancel(false));
```
### Benefits of this approach:
1. **Prevents resource leaks**: Futures that never complete will be cleaned up
after timeout
2. **Client-friendly**: Clients receive clear timeout error messages instead of
being stuck
3. **Enables recovery**: After timeout and connection close, clients can
reconnect and retry
4. **Consistent behavior**: Both producer and consumer creation have symmetric
timeout protection
5. **Minimal overhead**: Timeout task is cancelled immediately when future
completes normally
## Questions
1. **Is this issue already addressed in a newer version?** If so, which version
should we upgrade to?
2. **Is my analysis correct** that producer/consumer futures could potentially
remain incomplete indefinitely?
3. **Would this timeout approach be acceptable** for inclusion in the codebase?
Are there any concerns or alternative approaches you'd recommend?
4. **What should the timeout value be?** I proposed 60 seconds, but this could
be configurable. What would be a reasonable default?
Thank you for your time and consideration!
GitHub link: https://github.com/apache/pulsar/discussions/25025
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]