Copilot commented on code in PR #1443:
URL: https://github.com/apache/pulsar-client-go/pull/1443#discussion_r2576503491


##########
pulsar/consumer_zero_queue.go:
##########
@@ -128,7 +138,13 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context) 
(Message, error) {
                        if !ok {
                                return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
-                       return cm.Message, nil
+                       message, ok := cm.Message.(*message)
+                       if ok && message.getConn().ID() == z.pc._getConn().ID() 
{
+                               z.waitingOnReceive.Store(false)
+                               return cm.Message, nil
+                       } else {
+                               z.log.WithField("messageID", 
cm.Message.ID()).Warn("message from old connection discarded after 
reconnection")
+                       }

Review Comment:
   Potential nil pointer dereference: When the type assertion fails (`ok` is 
false), the code continues to the else block at line 146 where it calls 
`cm.Message.ID()` safely. However, when the type assertion succeeds but 
`message.getConn()` returns nil, calling `.ID()` on line 142 will panic. 
Consider adding a nil check for the connection before calling `.ID()`:
   
   ```go
   message, ok := cm.Message.(*message)
   if ok && message.getConn() != nil && message.getConn().ID() == 
z.pc._getConn().ID() {
        z.waitingOnReceive.Store(false)
        return cm.Message, nil
   } else {
        z.log.WithField("messageID", cm.Message.ID()).Warn("message from old 
connection discarded after reconnection")
   }
   ```



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,103 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+               Type:                    Shared, // using Shared subscription 
type to support unack subscription stats
+       })
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       assert.Nil(t, err)
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+       time.Sleep(5 * time.Second) // wait for topic unload finish
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })

Review Comment:
   Missing error handling: The error from `producer.Send` is ignored. Add an 
error check to ensure the message was sent successfully:
   
   ```go
   _, err = producer.Send(ctx, &ProducerMessage{
        Payload: []byte(fmt.Sprintf("hello-%d", 10)),
        Key:     "pulsar",
        Properties: map[string]string{
                "key-1": "pulsar-1",
        },
   })
   assert.Nil(t, err)
   ```
   ```suggestion
        })
        assert.Nil(t, err)
   ```



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,103 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+               Type:                    Shared, // using Shared subscription 
type to support unack subscription stats
+       })
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       assert.Nil(t, err)
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+       time.Sleep(5 * time.Second) // wait for topic unload finish
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)

Review Comment:
   Inconsistent error handling: The test uses `log.Fatal(err)` on line 294, 
which will terminate the test immediately and prevent proper cleanup. Consider 
using `assert.Nil(t, err)` or `require.NoError(t, err)` instead for consistency 
with the rest of the test and to allow the test framework to handle failures 
properly.
   ```suggestion
                        assert.Nil(t, err)
   ```



##########
pulsar/consumer_zero_queue.go:
##########
@@ -128,7 +138,13 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context) 
(Message, error) {
                        if !ok {
                                return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
-                       return cm.Message, nil
+                       message, ok := cm.Message.(*message)
+                       if ok && message.getConn().ID() == z.pc._getConn().ID() 
{
+                               z.waitingOnReceive.Store(false)
+                               return cm.Message, nil
+                       } else {
+                               z.log.WithField("messageID", 
cm.Message.ID()).Warn("message from old connection discarded after 
reconnection")
+                       }
                case <-ctx.Done():
                        return nil, ctx.Err()
                }

Review Comment:
   The `waitingOnReceive` flag is not reset when the context is canceled (line 
148-149) or when the consumer is closed (lines 135-136, 138-139). This could 
lead to incorrect permit management on subsequent reconnections. Consider 
resetting the flag in all exit paths:
   
   ```go
   case <-ctx.Done():
        z.waitingOnReceive.Store(false)
        return nil, ctx.Err()
   case <-z.closeCh:
        z.waitingOnReceive.Store(false)
        return nil, newError(ConsumerClosed, "consumer closed")
   ```



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,103 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+               Type:                    Shared, // using Shared subscription 
type to support unack subscription stats
+       })
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)

Review Comment:
   Missing resource cleanup: The test does not properly close the client, 
consumer, and producer. Add defer statements or explicit cleanup at the end of 
the test to prevent resource leaks:
   
   ```go
   defer client.Close()
   defer consumer.Close()
   defer producer.Close()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to