GitHub user Radiancebobo created a discussion: Persistent "Producer/Consumer 
with id is already present on the connection" errors after broker restart

## Environment
- **Pulsar Version**: 3.0.5
- **Client**: Go client v0.16.0
- **Scenario**: Partial broker restart

## Problem Description

After restarting some brokers in our Pulsar cluster, we continuously encounter 
the following errors:

```
Consumer with id is already present on the connection
Producer with id is already present on the connection
```

These errors persist and prevent clients from creating new producers/consumers, 
effectively blocking all operations until the client connection is manually 
restarted.

## Questions

### 1. Is this a known issue?
- Has this issue been reported and tracked before?
- If yes, in which version was it fixed?
- Are there any related PRs or issues I should reference?

### 2. Can producerFuture/consumerFuture remain incomplete indefinitely?

Looking at the code in `ServerCnx.java`, I found that:

**For Producer** :
```java
if (!existingProducerFuture.isDone()) {
    // There was an early request to create a producer with same producerId.
    // This can happen when client timeout is lower than the broker timeouts.
    // We need to wait until the previous producer creation request
    // either complete or fails.
    log.warn("[{}][{}] Producer with id is already present on the connection, 
producerId={}",
            remoteAddress, topicName, producerId);
    commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
            "Producer is already present on the connection");
}
```

**For Consumer** :
```java
if (!existingConsumerFuture.isDone()){
    // There was an early request to create a consumer with same consumerId. 
This can happen
    // when
    // client timeout is lower the broker timeouts. We need to wait until the 
previous
    // consumer
    // creation request either complete or fails.
    log.warn("[{}][{}][{}] Consumer with id is already present on the 
connection,"
            + " consumerId={}", remoteAddress, topicName, subscriptionName, 
consumerId);
    commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
            "Consumer is already present on the connection");
}
```

**My concern**: Is it possible that `producerFuture` or `consumerFuture` could 
remain in an incomplete state indefinitely due to:
- Network issues during broker restart
- Broker crash during topic loading
- Timeout in topic metadata operations
- Topic loading getting stuck

If these futures never complete, clients would be permanently stuck receiving 
these errors.

## Proposed Solution

To prevent futures from remaining incomplete indefinitely, I propose adding a 
**timeout mechanism** similar to what many async operations have. Here's my 
proposed implementation:

### 1. Add timeout constants
```java
private static final long PRODUCER_CREATION_TIMEOUT_SECONDS = 60;
private static final long CONSUMER_CREATION_TIMEOUT_SECONDS = 60;
```

### 2. Create timeout scheduling methods

**For Producer:**
```java
private ScheduledFuture<?> scheduleProducerCreationTimeout(long producerId,
                                                           
CompletableFuture<Producer> producerFuture,
                                                           long requestId,
                                                           TopicName topicName) 
{
    return ctx.executor().schedule(() -> {
        if (producerFuture.isDone()) {
            return;
        }
        TimeoutException timeoutException = new TimeoutException(
                String.format("Timed out creating producer after %d seconds: 
topic=%s, producerId=%d",
                        PRODUCER_CREATION_TIMEOUT_SECONDS, topicName, 
producerId));
        log.warn("[{}][{}] Producer creation timed out, closing connection: 
producerId={}",
                remoteAddress, topicName, producerId);
        if (producerFuture.completeExceptionally(timeoutException)) {
            commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
                    timeoutException.getMessage());
        }
        producers.remove(producerId, producerFuture);
        close();
    }, PRODUCER_CREATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
```

**For Consumer:**
```java
private ScheduledFuture<?> scheduleConsumerCreationTimeout(long consumerId,
                                                           
CompletableFuture<Consumer> consumerFuture,
                                                           long requestId,
                                                           TopicName topicName,
                                                           String 
subscriptionName) {
    return ctx.executor().schedule(() -> {
        if (consumerFuture.isDone()) {
            return;
        }
        TimeoutException timeoutException = new TimeoutException(
                String.format("Timed out creating consumer after %d seconds: 
topic=%s, subscription=%s, consumerId=%d",
                        CONSUMER_CREATION_TIMEOUT_SECONDS, topicName, 
subscriptionName, consumerId));
        log.warn("[{}][{}][{}] Consumer creation timed out, closing connection: 
consumerId={}",
                remoteAddress, topicName, subscriptionName, consumerId);
        if (consumerFuture.completeExceptionally(timeoutException)) {
            commandSender.sendErrorResponse(requestId, 
ServerError.ServiceNotReady,
                    timeoutException.getMessage());
        }
        consumers.remove(consumerId, consumerFuture);
        close();
    }, CONSUMER_CREATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
```

### 3. Schedule timeout when creating producer/consumer

**In handleProducer():**
```java
ScheduledFuture<?> producerCreationTimeout = scheduleProducerCreationTimeout(
        producerId, producerFuture, requestId, topicName);
producerFuture.whenComplete((__, ___) -> producerCreationTimeout.cancel(false));
```

**In handleSubscribe():**
```java
ScheduledFuture<?> consumerCreationTimeout = scheduleConsumerCreationTimeout(
        consumerId, consumerFuture, requestId, topicName, subscriptionName);
consumerFuture.whenComplete((__, ___) -> consumerCreationTimeout.cancel(false));
```

### Benefits of this approach:
1. **Prevents resource leaks**: Futures that never complete will be cleaned up 
after timeout
2. **Client-friendly**: Clients receive clear timeout error messages instead of 
being stuck
3. **Enables recovery**: After timeout and connection close, clients can 
reconnect and retry
4. **Consistent behavior**: Both producer and consumer creation have symmetric 
timeout protection
5. **Minimal overhead**: Timeout task is cancelled immediately when future 
completes normally

## Questions 

1. **Is this issue already addressed in a newer version?** If so, which version 
should we upgrade to?

2. **Is my analysis correct** that producer/consumer futures could potentially 
remain incomplete indefinitely?

3. **Would this timeout approach be acceptable** for inclusion in the codebase? 
Are there any concerns or alternative approaches you'd recommend?

4. **What should the timeout value be?** I proposed 60 seconds, but this could 
be configurable. What would be a reasonable default?


Thank you for your time and consideration!


GitHub link: https://github.com/apache/pulsar/discussions/25025

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to