This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.18.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit a97691d5f455e7bee810e4300d8a338a53928f1e
Author: crossoverJie <[email protected]>
AuthorDate: Fri Dec 5 15:05:19 2025 +0800

    fix: enhance zero queue consumer reconnection handling and message permit 
management (#1443)
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: Zixuan Liu <[email protected]>
    (cherry picked from commit 41bc6f4d416d69cfd25ea90a8efa9c56d03cbde3)
---
 pulsar/consumer_partition.go       |  18 +++--
 pulsar/consumer_partition_test.go  |   3 +
 pulsar/consumer_zero_queue.go      |  24 ++++++-
 pulsar/consumer_zero_queue_test.go | 142 +++++++++++++++++++++++++++++++++++++
 pulsar/impl_message.go             |   7 ++
 5 files changed, 184 insertions(+), 10 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index bfd44b18..ab35b7a1 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -124,10 +124,11 @@ type partitionConsumerOpts struct {
        expireTimeOfIncompleteChunk time.Duration
        autoAckIncompleteChunk      bool
        // in failover mode, this callback will be called when consumer change
-       consumerEventListener   ConsumerEventListener
-       enableBatchIndexAck     bool
-       ackGroupingOptions      *AckGroupingOptions
-       enableZeroQueueConsumer bool
+       consumerEventListener      ConsumerEventListener
+       enableBatchIndexAck        bool
+       ackGroupingOptions         *AckGroupingOptions
+       enableZeroQueueConsumer    bool
+       zeroQueueReconnectedPolicy func(*partitionConsumer)
 }
 
 type ConsumerEventListener interface {
@@ -170,6 +171,7 @@ type partitionConsumer struct {
        currentQueueSize       uAtomic.Int32
        scaleReceiverQueueHint uAtomic.Bool
        incomingMessages       uAtomic.Int32
+       reconnectCount         uAtomic.Int32
 
        eventsCh        chan interface{}
        connectedCh     chan struct{}
@@ -1393,6 +1395,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                orderingKey:         string(smm.OrderingKey),
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
+                               conn:                pc._getConn(),
                        }
                } else {
                        msg = &message{
@@ -1413,6 +1416,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                orderingKey:         
string(msgMeta.GetOrderingKey()),
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
+                               conn:                pc._getConn(),
                        }
                }
 
@@ -1541,6 +1545,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) 
*EncryptionContext {
 func (pc *partitionConsumer) ConnectionClosed(closeConsumer 
*pb.CommandCloseConsumer) {
        // Trigger reconnection in the consumer goroutine
        pc.log.Debug("connection closed and send to connectClosedCh")
+       pc.reconnectCount.Inc()
        var assignedBrokerURL string
        if closeConsumer != nil {
                assignedBrokerURL = pc.client.selectServiceURL(
@@ -1925,9 +1930,8 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        // Successfully reconnected
                        pc.log.Info("Reconnected consumer to broker")
                        bo.Reset()
-                       if pc.options.enableZeroQueueConsumer {
-                               pc.log.Info("zeroQueueConsumer reconnect, reset 
availablePermits")
-                               pc.availablePermits.inc()
+                       if pc.options.enableZeroQueueConsumer && 
pc.options.zeroQueueReconnectedPolicy != nil {
+                               pc.options.zeroQueueReconnectedPolicy(pc)
                        }
                        return struct{}{}, nil
                }
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 4594c148..31877693 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
                metrics:              newTestMetrics(),
                decryptor:            crypto.NewNoopDecryptor(),
        }
+       pc._setConn(dummyConnection{})
        pc.availablePermits = &availablePermits{pc: &pc}
        pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
                func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
@@ -76,6 +77,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
                metrics:              newTestMetrics(),
                decryptor:            crypto.NewNoopDecryptor(),
        }
+       pc._setConn(dummyConnection{})
        pc.availablePermits = &availablePermits{pc: &pc}
        pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
                func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
@@ -112,6 +114,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
                metrics:              newTestMetrics(),
                decryptor:            crypto.NewNoopDecryptor(),
        }
+       pc._setConn(dummyConnection{})
        pc.availablePermits = &availablePermits{pc: &pc}
        pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
                func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go
index 5b85df8e..97016c38 100644
--- a/pulsar/consumer_zero_queue.go
+++ b/pulsar/consumer_zero_queue.go
@@ -23,6 +23,8 @@ import (
        "sync"
        "time"
 
+       uAtomic "go.uber.org/atomic"
+
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/log"
        "github.com/pkg/errors"
@@ -36,6 +38,7 @@ type zeroQueueConsumer struct {
        pc                        *partitionConsumer
        consumerName              string
        disableForceTopicCreation bool
+       waitingOnReceive          uAtomic.Bool
 
        messageCh chan ConsumerMessage
 
@@ -71,11 +74,17 @@ func newZeroConsumer(client *client, options 
ConsumerOptions, topic string,
                return nil, err
        }
        opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 
tn.Partition, zc.options)
-       conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, 
zc.dlq, zc.metrics)
+       opts.zeroQueueReconnectedPolicy = func(pc *partitionConsumer) {
+               if zc.waitingOnReceive.Load() {
+                       pc.log.Info("zeroQueueConsumer reconnect, reset 
availablePermits")
+                       pc.availablePermits.inc()
+               }
+       }
+       pc, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, 
zc.dlq, zc.metrics)
        if err != nil {
                return nil, err
        }
-       zc.pc = conn
+       zc.pc = pc
 
        return zc, nil
 }
@@ -119,17 +128,26 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context) 
(Message, error) {
        }
        z.Lock()
        defer z.Unlock()
+       z.waitingOnReceive.Store(true)
        z.pc.availablePermits.inc()
        for {
                select {
                case <-z.closeCh:
+                       z.waitingOnReceive.Store(false)
                        return nil, newError(ConsumerClosed, "consumer closed")
                case cm, ok := <-z.messageCh:
                        if !ok {
                                return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
-                       return cm.Message, nil
+                       message, ok := cm.Message.(*message)
+                       if ok && message.getConn().ID() == z.pc._getConn().ID() 
{
+                               z.waitingOnReceive.Store(false)
+                               return cm.Message, nil
+                       } else {
+                               z.log.WithField("messageID", 
cm.Message.ID()).Warn("message from old connection discarded after 
reconnection")
+                       }
                case <-ctx.Done():
+                       z.waitingOnReceive.Store(false)
                        return nil, ctx.Err()
                }
        }
diff --git a/pulsar/consumer_zero_queue_test.go 
b/pulsar/consumer_zero_queue_test.go
index 444751a1..0b3e8760 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -243,6 +243,148 @@ func TestReconnectConsumer(t *testing.T) {
        defer c.Terminate(ctx)
 }
 
+func TestReconnectedBrokerSendPermits(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Name:         "pulsar-test",
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               HostConfigModifier: func(config *container.HostConfig) {
+                       config.PortBindings = map[nat.Port][]nat.PortBinding{
+                               "6650/tcp": {{HostIP: "0.0.0.0", HostPort: 
"6659"}},
+                               "8080/tcp": {{HostIP: "0.0.0.0", HostPort: 
"8089"}},
+                       }
+               },
+               Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+               Reuse:            true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
+       client, err := NewClient(ClientOptions{
+               URL:    endpoint,
+               Logger: plog.NewLoggerWithSlog(sLogger),
+       })
+       assert.Nil(t, err)
+       adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", 
"http")
+       assert.Nil(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminEndpoint,
+       })
+       assert.Nil(t, err)
+
+       topic := newTopicName()
+       var consumer Consumer
+       require.Eventually(t, func() bool {
+               consumer, err = client.Subscribe(ConsumerOptions{
+                       Topic:                   topic,
+                       SubscriptionName:        "my-sub",
+                       EnableZeroQueueConsumer: true,
+                       Type:                    Shared, // using Shared 
subscription type to support unack subscription stats
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       log.Println("unloading topic")
+       topicName, err := utils.GetTopicName(topic)
+       assert.Nil(t, err)
+       err = admin.Topics().Unload(*topicName)
+       assert.Nil(t, err)
+       log.Println("unloaded topic")
+       zc, ok := consumer.(*zeroQueueConsumer)
+       assert.True(t, ok)
+       // wait for reconnect
+       require.EventuallyWithT(t, func(c *assert.CollectT) {
+               reconnectCount := zc.pc.reconnectCount.Load()
+               require.Equal(c, reconnectCount, int32(1))
+       }, 30*time.Second, 1*time.Second)
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       assert.Nil(t, err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       //      send one more message and we do not manually receive it
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("hello-%d", 10)),
+               Key:     "pulsar",
+               Properties: map[string]string{
+                       "key-1": "pulsar-1",
+               },
+       })
+       assert.Nil(t, err)
+       //      wait for broker send messages to consumer and topic stats 
update finish
+       option := utils.GetStatsOptions{
+               GetPreciseBacklog: true,
+       }
+       require.EventuallyWithT(t, func(c *assert.CollectT) {
+               topicStats, err := 
admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
+               require.Nil(c, err)
+               for _, subscriptionStats := range topicStats.Subscriptions {
+                       require.Equal(c, subscriptionStats.MsgBacklog, int64(1))
+                       require.Equal(c, 
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+               }
+       }, 30*time.Second, 1*time.Second)
+
+       // ack
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       err = consumer.Ack(msg)
+       assert.Nil(t, err)
+
+       // check topic stats
+       require.EventuallyWithT(t, func(c *assert.CollectT) {
+               topicStats, err := 
admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option)
+               require.Nil(c, err)
+               for _, subscriptionStats := range topicStats.Subscriptions {
+                       require.Equal(c, subscriptionStats.MsgBacklog, int64(0))
+                       require.Equal(c, 
subscriptionStats.Consumers[0].UnAckedMessages, 0)
+               }
+       }, 30*time.Second, 1*time.Second)
+
+}
+
 func TestUnloadTopicBeforeConsume(t *testing.T) {
 
        sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index f32cc7fc..4f314c3d 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -26,6 +26,8 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+
        "google.golang.org/protobuf/proto"
 
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -313,6 +315,7 @@ type message struct {
        encryptionContext   *EncryptionContext
        index               *uint64
        brokerPublishTime   *time.Time
+       conn                internal.Connection
 }
 
 func (msg *message) Topic() string {
@@ -394,6 +397,10 @@ func (msg *message) size() int {
        return len(msg.payLoad)
 }
 
+func (msg *message) getConn() internal.Connection {
+       return msg.conn
+}
+
 func newAckTracker(size uint) *ackTracker {
        batchIDs := bitset.New(size)
        for i := uint(0); i < size; i++ {

Reply via email to