wuYin commented on a change in pull request #359:
URL: https://github.com/apache/pulsar-client-go/pull/359#discussion_r485617154
##########
File path: pulsar/internal/connection.go
##########
@@ -356,24 +357,35 @@ func (c *connection) run() {
defer func() {
// all the accesses to the pendingReqs should be happened in
this run loop thread,
// including the final cleanup, to avoid the issue
https://github.com/apache/pulsar-client-go/issues/239
+ c.pendingLock.Lock()
for id, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
delete(c.pendingReqs, id)
}
+ c.pendingLock.Unlock()
c.Close()
}()
+ go func() {
+ for {
+ select {
+ case <-c.closeCh:
+ return
+
+ case req := <-c.incomingRequestsCh:
+ if req == nil {
+ return // TODO: this never gonna be
happen
+ }
+ c.internalSendRequest(req)
+ }
+ }
+ }()
+
Review comment:
Background:
- Retry topic producer, User topic consumer, they two use same connection
instance to interact with broker.
- Request channel, Response channel, they two are listened in a [single
`select{}`
loop](https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L367)
by a connection.
When message need to be redelivery:
1. Retry topic producer called `SendAsync()`, wrapped User topic consumer
AckID logic as **callback** into message handler.
2. connection send cmd `SEND` to broker.
3. Broker send cmd `SEND_RECEIPT` to connection.
4. connection received and call **callback**
5. **callback** called `AckID()`, then connection want to send cmd `ACK` to
broker.
in the meantime, `select{}` was stucked in step 3 and executing
`SEND_RECEIPT` handler, `incomingRequestsCh` never be chosen, when it fulled
with 10 Ack requests, finnally lead to dead block.
Asynchronized reuquest & response channels in connection, this block will be
resolved.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]