geniusjoe commented on code in PR #1443:
URL: https://github.com/apache/pulsar-client-go/pull/1443#discussion_r2576865723


##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,103 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+               Type:                    Shared, // using Shared subscription 
type to support unack subscription stats
+       })
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       assert.Nil(t, err)
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+       time.Sleep(5 * time.Second) // wait for topic unload finish
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       assert.Nil(t, err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       //      wait for broker send messages to consumer and topic stats 
update finish
+       time.Sleep(5 * time.Second)

Review Comment:
   Maybe use assert.Eventually() for speeding up test case?



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,136 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Name:         "pulsar-test",
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               HostConfigModifier: func(config *container.HostConfig) {
+                       config.PortBindings = map[nat.Port][]nat.PortBinding{
+                               "6650/tcp": {{HostIP: "0.0.0.0", HostPort: 
"6659"}},
+                               "8080/tcp": {{HostIP: "0.0.0.0", HostPort: 
"8089"}},
+                       }
+               },
+               Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+               Reuse:            true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL: endpoint,
+       })
+       assert.Nil(t, err)
+       adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", 
"http")
+       assert.Nil(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminEndpoint,
+       })
+       assert.Nil(t, err)
+
+       topic := newTopicName()
+       var consumer Consumer
+       require.Eventually(t, func() bool {
+               consumer, err = client.Subscribe(ConsumerOptions{
+                       Topic:                   topic,
+                       SubscriptionName:        "my-sub",
+                       EnableZeroQueueConsumer: true,
+                       Type:                    Shared, // using Shared 
subscription type to support unack subscription stats
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       assert.Nil(t, err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       assert.Nil(t, err)
+       //      wait for broker send messages to consumer and topic stats 
update finish
+       require.EventuallyWithT(t, func(c *assert.CollectT) {
+               topicStats, err := admin.Topics().GetStats(*topicName)
+               require.Nil(c, err)
+               for _, subscriptionStats := range topicStats.Subscriptions {
+                       require.Equal(c, subscriptionStats.MsgBacklog, int64(1))
+                       require.Equal(c, 
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+               }
+       }, 30*time.Second, 1*time.Second)

Review Comment:
   Maybe tick time be a 5 seconds, so that broker can have enough time to send 
messages to consumer



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,136 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Name:         "pulsar-test",
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               HostConfigModifier: func(config *container.HostConfig) {
+                       config.PortBindings = map[nat.Port][]nat.PortBinding{
+                               "6650/tcp": {{HostIP: "0.0.0.0", HostPort: 
"6659"}},
+                               "8080/tcp": {{HostIP: "0.0.0.0", HostPort: 
"8089"}},
+                       }
+               },
+               Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+               Reuse:            true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL: endpoint,
+       })
+       assert.Nil(t, err)
+       adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", 
"http")
+       assert.Nil(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminEndpoint,
+       })
+       assert.Nil(t, err)
+
+       topic := newTopicName()
+       var consumer Consumer
+       require.Eventually(t, func() bool {
+               consumer, err = client.Subscribe(ConsumerOptions{
+                       Topic:                   topic,
+                       SubscriptionName:        "my-sub",
+                       EnableZeroQueueConsumer: true,
+                       Type:                    Shared, // using Shared 
subscription type to support unack subscription stats
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       assert.Nil(t, err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       assert.Nil(t, err)
+       //      wait for broker send messages to consumer and topic stats 
update finish
+       require.EventuallyWithT(t, func(c *assert.CollectT) {
+               topicStats, err := admin.Topics().GetStats(*topicName)
+               require.Nil(c, err)
+               for _, subscriptionStats := range topicStats.Subscriptions {
+                       require.Equal(c, subscriptionStats.MsgBacklog, int64(1))
+                       require.Equal(c, 
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+               }
+       }, 30*time.Second, 1*time.Second)
+
+       // ack
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = consumer.Ack(msg)
+       assert.Nil(t, err)
+
+       // check topic stats
+       require.EventuallyWithT(t, func(c *assert.CollectT) {
+               topicStats, err := admin.Topics().GetStats(*topicName)
+               require.Nil(c, err)
+               for _, subscriptionStats := range topicStats.Subscriptions {
+                       require.Equal(c, subscriptionStats.MsgBacklog, int64(0))
+                       require.Equal(c, 
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+               }
+       }, 30*time.Second, 1*time.Second)

Review Comment:
   Maybe adjust tick time as well



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,103 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+               Type:                    Shared, // using Shared subscription 
type to support unack subscription stats
+       })
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       assert.Nil(t, err)
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+       time.Sleep(5 * time.Second) // wait for topic unload finish

Review Comment:
   Do we need to increase this sleep period? I'm not sure whether 5s is enough 
in CI environment.



##########
pulsar/consumer_zero_queue.go:
##########
@@ -71,11 +73,18 @@ func newZeroConsumer(client *client, options 
ConsumerOptions, topic string,
                return nil, err
        }
        opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 
tn.Partition, zc.options)
-       conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, 
zc.dlq, zc.metrics)
+       opts.zeroQueueConsumer = zc
+       opts.zeroQueueReconnectedPolicy = func(pc *partitionConsumer, z 
*zeroQueueConsumer) {

Review Comment:
   +1



##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,103 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                   topic,
+               SubscriptionName:        "my-sub",
+               EnableZeroQueueConsumer: true,
+               Type:                    Shared, // using Shared subscription 
type to support unack subscription stats
+       })
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       assert.Nil(t, err)
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+       time.Sleep(5 * time.Second) // wait for topic unload finish
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       assert.Nil(t, err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       //      wait for broker send messages to consumer and topic stats 
update finish
+       time.Sleep(5 * time.Second)
+       topicStats, err := admin.Topics().GetStats(*topicName)
+       assert.Nil(t, err)
+       for _, subscriptionStats := range topicStats.Subscriptions {
+               assert.Equal(t, subscriptionStats.MsgBacklog, int64(1))
+               assert.Equal(t, subscriptionStats.Consumers[0].UnAckedMessages, 
0)
+       }
+
+       // ack
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = consumer.Ack(msg)
+       assert.Nil(t, err)
+
+       // check topic stats
+       time.Sleep(5 * time.Second)

Review Comment:
   Consider using assert.eventually() as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to