fracasula commented on code in PR #1310:
URL: https://github.com/apache/pulsar-client-go/pull/1310#discussion_r1851963713
##########
pulsar/consumer_partition.go:
##########
@@ -949,6 +949,11 @@ func (pc *partitionConsumer) Close() {
// wait for request to finish
<-req.doneCh
+
+ // It will close `queueCh.in`. If `MessageReceived` was called after
that, it will panic because new messages
+ // will be sent to a closed channel. However, generally it's impossible
because the broker will not be able to
+ // dispatch messages to this consumer after receiving the close request.
+ pc.queueCh.stop()
Review Comment:
I see that you're not closing `closeCh` here but you're just closing `inCh`
and `outCh`.
Beware because `closeCh` is an unbuffered channel and once you call `stop`
the first time the listener of that channel will return so any subsequent
`c.closeCh <- struct{}{}` is going to block forever and it will be hard to
detect.
Wouldn't it be better to just close `closeCh`? It will have the same side
effect but at least if you `stop` twice, you'll catch it instead of silently
being stuck. Given that you're not planning to call `stop` multiple times,
wouldn't it be better to close the channel then? Wdyt?
##########
pulsar/consumer_partition.go:
##########
@@ -1133,6 +1138,15 @@ func (pc *partitionConsumer) internalAckList(request
*ackListRequest) {
}
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage,
headersAndPayload internal.Buffer) error {
+ defer func() {
+ if r := recover(); r != nil {
+ if err, ok := r.(error); ok && err.Error() == "send on
closed channel" {
+ pc.log.WithField("panic", r).Debug("panic
recovered in MessageReceived")
Review Comment:
If this were to happen it'd be quite bad in my opinion. Do you really want
to raise is as a `DEBUG` only?
I would want to know if you're sending on a closed channel in production
code. I suggest a `WARNING` here.
##########
pulsar/consumer_partition.go:
##########
@@ -1564,7 +1578,7 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.Debug("skip dispatching messages when
seeking")
}
} else {
- queueCh = pc.queueCh
+ queueCh = pc.queueCh.outCh
Review Comment:
I find this code a bit convoluted. Can you help me understand why you assign
the channel to `queueCh` and then later range over `pc.queueCh` directly if you
get a signal to clear the queue?
When is the variable needed and when is not is not very clear. Perhaps a
comment might help demistify this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]