shibd commented on code in PR #976:
URL: https://github.com/apache/pulsar-client-go/pull/976#discussion_r1125680331


##########
pulsar/consumer_partition.go:
##########
@@ -1187,35 +1220,48 @@ func (pc *partitionConsumer) dispatcher() {
        defer func() {
                pc.log.Debug("exiting dispatch loop")
        }()
-       var messages []*message
-       for {
-               var queueCh chan []*message
-               var messageCh chan ConsumerMessage
-               var nextMessage ConsumerMessage
 
-               // are there more messages to send?
-               if len(messages) > 0 {
-                       nextMessage = ConsumerMessage{
-                               Consumer: pc.parentConsumer,
-                               Message:  messages[0],
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       go func() {
+               var messageCh chan ConsumerMessage
+               for {
+                       if len(pc.queueCh) == 0 {

Review Comment:
   Why need a new goroutine to handle this logic?  Can we directly determine 
whether we need to scale after push nextMessage to `messageCh`?
   
   
https://github.com/apache/pulsar-client-go/blob/42ded0d59c46fd3fdaad45f045f7e8bf091131a5/pulsar/consumer_partition.go#L1249-L1255



##########
pulsar/consumer_partition.go:
##########
@@ -1226,34 +1272,16 @@ func (pc *partitionConsumer) dispatcher() {
                        }
                        pc.log.Debug("dispatcher received connection event")
 
-                       messages = nil
-
                        // reset available permits
                        pc.availablePermits.reset()
-                       initialPermits := uint32(pc.queueSize)
+                       initialPermits := uint32(pc.maxQueueSize)

Review Comment:
   We need set `initialReceiverQueueSize `?



##########
pulsar/consumer_partition.go:
##########
@@ -1045,8 +1072,14 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                messages = append(messages, msg)
        }
 
+       if len(pc.queueCh) == 0 && len(pc.messageCh) == 0 {

Review Comment:
   Why judge the length of `queueCh` and `messageCh`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to