crossoverJie commented on code in PR #1443:
URL: https://github.com/apache/pulsar-client-go/pull/1443#discussion_r2583789759
##########
pulsar/consumer_zero_queue_test.go:
##########
@@ -243,6 +243,136 @@ func TestReconnectConsumer(t *testing.T) {
defer c.Terminate(ctx)
}
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Name: "pulsar-test",
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ HostConfigModifier: func(config *container.HostConfig) {
+ config.PortBindings = map[nat.Port][]nat.PortBinding{
+ "6650/tcp": {{HostIP: "0.0.0.0", HostPort:
"6659"}},
+ "8080/tcp": {{HostIP: "0.0.0.0", HostPort:
"8089"}},
+ }
+ },
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ Reuse: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ })
+ assert.Nil(t, err)
+ adminEndpoint, err := c.PortEndpoint(context.Background(), "8080",
"http")
+ assert.Nil(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{
+ WebServiceURL: adminEndpoint,
+ })
+ assert.Nil(t, err)
+
+ topic := newTopicName()
+ var consumer Consumer
+ require.Eventually(t, func() bool {
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ EnableZeroQueueConsumer: true,
+ Type: Shared, // using Shared
subscription type to support unack subscription stats
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ ctx := context.Background()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ log.Printf("send message: %s", msg.String())
+ }
+
+ log.Println("unloading topic")
+ topicName, err := utils.GetTopicName(topic)
+ assert.Nil(t, err)
+ err = admin.Topics().Unload(*topicName)
+ assert.Nil(t, err)
+ log.Println("unloaded topic")
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ assert.Nil(t, err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+ // ack message
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ log.Printf("receive message: %s", msg.ID().String())
+ }
+ // send one more message and we do not manually receive it
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ // wait for broker send messages to consumer and topic stats
update finish
Review Comment:
```go
func TestUnloadTopicBeforeConsume2(t *testing.T) {
sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
client, err := NewClient(ClientOptions{
URL: lookupURL,
Logger: plog.NewLoggerWithSlog(sLogger),
})
assert.Nil(t, err)
admin, err := pulsaradmin.NewClient(&config.Config{})
assert.Nil(t, err)
defer client.Close()
topic := newTopicName()
ctx := context.Background()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
EnableZeroQueueConsumer: true,
Type: Shared,
})
assert.Nil(t, err)
_, ok := consumer.(*zeroQueueConsumer)
assert.True(t, ok)
defer consumer.Close()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
// send 10 messages
for i := 0; i < 10; i++ {
msg, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
assert.Nil(t, err)
log.Printf("send message: %s", msg.String())
}
log.Println("unloading topic")
topicName, err := utils.GetTopicName(topic)
assert.Nil(t, err)
err = admin.Topics().Unload(*topicName)
assert.Nil(t, err)
log.Println("unloaded topic")
time.Sleep(1 * time.Minute) // wait for topic unload finish
// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
err = consumer.Ack(msg)
assert.Nil(t, err)
log.Printf("receive message: %s", msg.ID().String())
}
// send one more message and we do not manually receive it
_, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", 11)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
})
// wait for broker send messages to consumer and topic stats
update finish
time.Sleep(1 * time.Minute)
topicStats, err := admin.Topics().GetStats(*topicName)
for _, subscriptionStats := range topicStats.Subscriptions {
assert.Equal(t, subscriptionStats.Consumers[0].UnAckedMessages,
0)
}
}
```
You can switch to the master branch's code and then run this unit test to
reproduce the issue.
<img width="985" height="272" alt="image"
src="https://github.com/user-attachments/assets/b7cd38fa-ea0b-41ed-8d8c-d2952593fe1b"
/>
It sends the permit 11 times (sending one extra), causing `UnackedMessages`
to increase.
----
<img width="924" height="314" alt="image"
src="https://github.com/user-attachments/assets/aec6d597-2e7b-43fa-9c37-20c24545b8f2"
/>
After using the fixed code, only 10 permits are sent, and the unit test also
passes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]