BewareMyPower opened a new pull request, #1310: URL: https://github.com/apache/pulsar-client-go/pull/1310
### Motivation When `AckWithResponse` is enabled, a deadlock could happen easily if `queueCh` and `messagesCh` of `partitionConsumer` are full. In this case, if the consumer receives new messages from broker, `MessageReceived` will be blocked at https://github.com/apache/pulsar-client-go/blob/9366a0eaff772b7097ab1bb2147e287460dc61ba/pulsar/consumer_partition.go#L1376 The stacks could be: ``` 3 0x0000000100ea83c0 in github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).MessageReceived at ./consumer_partition.go:1376 4 0x0000000100b1c73c in github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage at ./internal/connection.go:751 5 0x0000000100b1ab7c in github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand at ./internal/connection.go:590 ``` As shown in the stacks above, the `connection.run()` goroutine is blocked so that it could not handle any new command anymore, including the ACK response. Then the ACK related method will fail with "request timed out". The deadlock cannot be resolved unless the consumer peeks new messages again to make `messagesCh` not full, then `queueCh` will move messages to `messagesCh` so that `queueCh` will be not full. The root cause is that `queueCh` is a buffered channel that has a fixed size of `ReceiverQueueSize`. However, the broker could dispatch more messages than the `ReceiverQueueSize` because the permits in Flow requests only limits the number of entries to read, not the number of messages. Hence this issue could be easily reproduced by reducing the `ReceiverQueueSize` and sending many messages with a great batch size. See how `TestAckResponseNotBlocked` reproduces this issue. ### Modifications Implement `unboundedChannel` with two channels `in` and `out`. The caller side should send values to `in` and receive values from `out`. In the background, there is a goroutine that move messages from `in` to the internal value list and peek the first value to `out`. After that, change `queueCh`'s type to `unboundedChannel` so `queueCh.in <- msg` will never block the current goroutine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
