crossoverJie commented on code in PR #1443:
URL: https://github.com/apache/pulsar-client-go/pull/1443#discussion_r2587995905


##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,136 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Name:         "pulsar-test",
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               HostConfigModifier: func(config *container.HostConfig) {
+                       config.PortBindings = map[nat.Port][]nat.PortBinding{
+                               "6650/tcp": {{HostIP: "0.0.0.0", HostPort: 
"6659"}},
+                               "8080/tcp": {{HostIP: "0.0.0.0", HostPort: 
"8089"}},
+                       }
+               },
+               Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+               Reuse:            true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL: endpoint,
+       })
+       assert.Nil(t, err)
+       adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", 
"http")
+       assert.Nil(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminEndpoint,
+       })
+       assert.Nil(t, err)
+
+       topic := newTopicName()
+       var consumer Consumer
+       require.Eventually(t, func() bool {
+               consumer, err = client.Subscribe(ConsumerOptions{
+                       Topic:                   topic,
+                       SubscriptionName:        "my-sub",
+                       EnableZeroQueueConsumer: true,
+                       Type:                    Shared, // using Shared 
subscription type to support unack subscription stats
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       assert.Nil(t, err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       assert.Nil(t, err)
+       //      wait for broker send messages to consumer and topic stats 
update finish

Review Comment:
   This is because the `sleep` code after `unload` was previously commented 
out, causing the consumer not to trigger the reconnection mechanism.
   
   I've added a `reconnectCount` field to record the number of reconnection 
attempts, ensuring the reconnection mechanism is definitely triggered.
   
   > This field can also be used as a monitoring metric for the consumer in the 
future.
   
   
   You can now reproduce this issue on the master branch using the latest code 
of `TestReconnectedBrokerSendPermits` (replacing `reconnectCount` with `sleep`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to