BewareMyPower opened a new pull request, #1310:
URL: https://github.com/apache/pulsar-client-go/pull/1310

   ### Motivation
   
   When `AckWithResponse` is enabled, a deadlock could happen easily if 
`queueCh` and `messagesCh` of `partitionConsumer` are full. In this case, if 
the consumer receives new messages from broker, `MessageReceived` will be 
blocked at 
https://github.com/apache/pulsar-client-go/blob/9366a0eaff772b7097ab1bb2147e287460dc61ba/pulsar/consumer_partition.go#L1376
   
   The stacks could be:
   
   ```
   3  0x0000000100ea83c0 in 
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).MessageReceived
      at ./consumer_partition.go:1376
   4  0x0000000100b1c73c in 
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage
      at ./internal/connection.go:751
   5  0x0000000100b1ab7c in 
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand
      at ./internal/connection.go:590
   ```
   
   As shown in the stacks above, the `connection.run()` goroutine is blocked so 
that it could not handle any new command anymore, including the ACK response. 
Then the ACK related method will fail with "request timed out". The deadlock 
cannot be resolved unless the consumer peeks new messages again to make 
`messagesCh` not full, then `queueCh` will move messages to `messagesCh` so 
that `queueCh` will be not full.
   
   The root cause is that `queueCh` is a buffered channel that has a fixed size 
of `ReceiverQueueSize`. However, the broker could dispatch more messages than 
the `ReceiverQueueSize` because the permits in Flow requests only limits the 
number of entries to read, not the number of messages. Hence this issue could 
be easily reproduced by reducing the `ReceiverQueueSize` and sending many 
messages with a great batch size. See how `TestAckResponseNotBlocked` 
reproduces this issue.
   
   ### Modifications
   
   Implement `unboundedChannel` with two channels `in` and `out`. The caller 
side should send values to `in` and receive values from `out`. In the 
background, there is a goroutine that move messages from `in` to the internal 
value list and peek the first value to `out`. After that, change `queueCh`'s 
type to `unboundedChannel` so `queueCh.in <- msg` will never block the current 
goroutine.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to