geniusjoe opened a new issue, #1236:
URL: https://github.com/apache/pulsar-client-go/issues/1236
#### Expected behavior
The `failTimeoutMessages()` timer in `pulsar/producer_partition.go` should
periodically delete outdated messages in `p.pendingQueue`
which are older than `p.options.SendTimeout`.
#### Actual behavior
Messages in `p.pendingQueue` which are older than `p.options.SendTimeout`
cannot be deleted. If one message cannot send successfully all the time, it
will remain in the `pendingQueue` forever and lead to reconnecting fail
infinitely.
#### Helpful information
I think this bug may be related to bugfix #551.
Bugfix #551 is aimed to solve a race condition between `grabCnx()` and
`failTimeoutMessages()` functions. Producer may encounter this race condition
when `failTimeoutMessages()` first delete outdated messages, then `grabCnx()`
reconnect success and resend these pending deleted messages.
#551 solution is to refresh all `p.pendingQueue` messages `sendAt` field
when `grabCnx()` reconnect success, so that `failTimeoutMessages()` will not
take effect to these messages. [Code
reference](https://github.com/apache/pulsar-client-go/blob/a029f2d7e392fa37511ac44c3276832705ded08b/pulsar/producer_partition.go#L353)
below:
```
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.Poll()
if item == nil {
continue
}
pi := item.(*pendingItem)
// when resending pending batches, we update the sendAt timestamp and
put to the back of queue
// to avoid pending item been removed by failTimeoutMessages and cause
race condition
pi.Lock()
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p._getConn().WriteData(pi.buffer)
if pi == lastViewItem {
break
}
}
```
When compared with Java client, Java code may not encounter this race
condition. Java `failPendingMessages()` will first check if current channel
connection is close, if current connection is not close, then it will trigger
`cnx.ctx().channel().eventLoop()` to avoid race condition. [Code
reference](https://github.com/apache/pulsar/blob/f70e52a700d3348dbdb2615495a2beb16c790f23/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2169)
below:
```
// If we have a connection, we schedule the callback and recycle on the
event loop thread to avoid any
// race condition since we also write the message on the socket from this
thread
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (ProducerImpl.this) {
failPendingMessages(null, ex);
}
});
```
I think we can add `Lock()` and `Unlock()` as member method in
`p.pendingQueue` and remove lock operation in all of implementation methods
such as `Put()` or `Poll()`. We should regard "iterate every member and delete
some of them" or "iterate every member and change some member value" as an
atomic operation. We should call `p.pendingQueue.Lock()` or
`p.pendingQueue.Unlock()` whenever we take some action to the `p.pendingQueue`.
#### Steps to reproduce
1. Update broker `handleSend()` function, always return a transient error
when encountered some specific message.
```
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
...
this.ctx().channel().eventLoop().execute(() -> {
this.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId,
ServerError.UnknownError, "UnknownError"));
this.completedSendOperation(producer.isNonPersistentTopic(), 0);
});
}
```
2. Write a demo to send this specific message and this demo will reconnect
infinitely.
```
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<url>:6650",
Authentication: pulsar.NewAuthenticationToken("<token>"),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "<tenant>/<namespace>/<topic>",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
if msgId, err := producer.Send(
ctx,
&pulsar.ProducerMessage{
Payload: []byte("specificMessage"),
},
); err != nil {
log.Fatal(err)
} else {
log.Println("Published message: ", msgId)
}
```
Log will something like below and I use an `UnknownError` as a transient
error
```
INFO[0000] Reconnected producer to broker
cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError]
local_addr="<local_addr>:37696" remote_addr="pulsar://<pulsar_addr>:10003"
WARN[0000] Connection was closed
cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Connection was closed
cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=2
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer producerID=2
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnecting to broker assignedBrokerURL=
delayReconnectTime=115.401948ms producerID=2 producer_name=<tenant>-2-58692
topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnecting to broker assignedBrokerURL=
delayReconnectTime=103.170835ms producerID=1 producer_name=<tenant>-2-58691
topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Connecting to broker
remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] TCP connection established
local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connection is ready
local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connected producer
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=2
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnected producer to broker
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Connected producer
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Resending 1 pending batches producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnected producer to broker
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError]
local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
WARN[0000] Connection was closed
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
WARN[0000] Connection was closed
cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] runEventsLoop will reconnect in producer producerID=2
producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnecting to broker assignedBrokerURL=
delayReconnectTime=112.652459ms producerID=2 producer_name=<tenant>-2-58692
topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnecting to broker assignedBrokerURL=
delayReconnectTime=104.869345ms producerID=1 producer_name=<tenant>-2-58691
topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Connecting to broker
remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] TCP connection established
local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connection is ready
local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connected producer
cnx="<local_addr>:37712 -> <pulsar_addr>:10003" epoch=4 producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Resending 1 pending batches producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnected producer to broker
cnx="<local_addr>:37712 -> <pulsar_addr>:10003" producerID=1
producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError]
local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
...
```
#### System configuration
**Client version**: 0.12.1
**Broker version**: 2.9
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]