bluecucumber1989-commits opened a new issue, #1461:
URL: https://github.com/apache/pulsar-client-go/issues/1461
#### Expected behavior
no message loss
#### Actual behavior
During the Consumer reconnection process, if two consecutive
reconnectToBroker calls occur, it leads to message loss. Specifically: messages
received from Broker during the first reconnection (e.g., M1, M2) are cleared
during the second reconnection, and users ultimately only see subsequent
messages (e.g., M3, M4).
#### Steps to reproduce
Prerequisites:
1. Consumer uses Durable subscription mode (default mode)
2. Messages have not been acked yet
3. Broker owner changed
Trigger Conditions:
1. Broker sends CommandCloseConsumer to close the Consumer
2. Before the first reconnection completes, TCP connection breaks triggering
a second reconnection
_**Maybe Race Condition in Duplicate Reconnection**_:
`handleCloseConsumer` calls `ConnectionClosed` before deleting the handler.
If the TCP connection breaks before deletion, the `Close` method will call
`ConnectionClosed` again
```
// First call: Broker actively closes
func (c *connection) handleCloseConsumer(closeConsumer
*pb.CommandCloseConsumer) {
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed(closeConsumer) // First call
c.DeleteConsumeHandler(consumerID) // Delete handler
afterwards
}
}
// Second call: TCP connection breaks
func (c *connection) Close() {
c.closeOnce.Do(func() {
listeners, consumerHandlers, cnx := c.closeAndEmptyObservers()
// If DeleteConsumeHandler hasn't executed, snapshot still contains
the consumer
for _, handler := range consumerHandlers {
handler.ConnectionClosed(nil) // Second call
}
})
}
```
_**Then, Key Problem Points Maybe:**_
The second reconection clears previous local queue: `pc.clearReceiverQueue`
clears dispatcher.messages, causing M1, M2 to be cleared during the second
reconnection ?
```
// pulsar/consumer_partition.go
func (pc *partitionConsumer) grabConn() error {
// ...
if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil {
pc.startMessageID.set(seekMsgID)
pc.seekMessageID.set(nil)
} else {
pc.startMessageID.set(pc.clearReceiverQueue())
}
// In Durable mode, the StartMessageId is not sent to the broker
if pc.options.subscriptionMode != Durable {
cmdSubscribe.StartMessageId =
convertToMessageIDData(pc.startMessageID.get())
}
// ...
}
```
#### System configuration
Pulsar version: 4.1
pulsar-client-go: 0.18.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]