This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.18.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit a97691d5f455e7bee810e4300d8a338a53928f1e Author: crossoverJie <[email protected]> AuthorDate: Fri Dec 5 15:05:19 2025 +0800 fix: enhance zero queue consumer reconnection handling and message permit management (#1443) Co-authored-by: Copilot <[email protected]> Co-authored-by: Zixuan Liu <[email protected]> (cherry picked from commit 41bc6f4d416d69cfd25ea90a8efa9c56d03cbde3) --- pulsar/consumer_partition.go | 18 +++-- pulsar/consumer_partition_test.go | 3 + pulsar/consumer_zero_queue.go | 24 ++++++- pulsar/consumer_zero_queue_test.go | 142 +++++++++++++++++++++++++++++++++++++ pulsar/impl_message.go | 7 ++ 5 files changed, 184 insertions(+), 10 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index bfd44b18..ab35b7a1 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -124,10 +124,11 @@ type partitionConsumerOpts struct { expireTimeOfIncompleteChunk time.Duration autoAckIncompleteChunk bool // in failover mode, this callback will be called when consumer change - consumerEventListener ConsumerEventListener - enableBatchIndexAck bool - ackGroupingOptions *AckGroupingOptions - enableZeroQueueConsumer bool + consumerEventListener ConsumerEventListener + enableBatchIndexAck bool + ackGroupingOptions *AckGroupingOptions + enableZeroQueueConsumer bool + zeroQueueReconnectedPolicy func(*partitionConsumer) } type ConsumerEventListener interface { @@ -170,6 +171,7 @@ type partitionConsumer struct { currentQueueSize uAtomic.Int32 scaleReceiverQueueHint uAtomic.Bool incomingMessages uAtomic.Int32 + reconnectCount uAtomic.Int32 eventsCh chan interface{} connectedCh chan struct{} @@ -1393,6 +1395,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header orderingKey: string(smm.OrderingKey), index: messageIndex, brokerPublishTime: brokerPublishTime, + conn: pc._getConn(), } } else { msg = &message{ @@ -1413,6 +1416,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header orderingKey: string(msgMeta.GetOrderingKey()), index: messageIndex, brokerPublishTime: brokerPublishTime, + conn: pc._getConn(), } } @@ -1541,6 +1545,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext { func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseConsumer) { // Trigger reconnection in the consumer goroutine pc.log.Debug("connection closed and send to connectClosedCh") + pc.reconnectCount.Inc() var assignedBrokerURL string if closeConsumer != nil { assignedBrokerURL = pc.client.selectServiceURL( @@ -1925,9 +1930,8 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose // Successfully reconnected pc.log.Info("Reconnected consumer to broker") bo.Reset() - if pc.options.enableZeroQueueConsumer { - pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits") - pc.availablePermits.inc() + if pc.options.enableZeroQueueConsumer && pc.options.zeroQueueReconnectedPolicy != nil { + pc.options.zeroQueueReconnectedPolicy(pc) } return struct{}{}, nil } diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 4594c148..31877693 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), } + pc._setConn(dummyConnection{}) pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) @@ -76,6 +77,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), } + pc._setConn(dummyConnection{}) pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) @@ -112,6 +114,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), } + pc._setConn(dummyConnection{}) pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go index 5b85df8e..97016c38 100644 --- a/pulsar/consumer_zero_queue.go +++ b/pulsar/consumer_zero_queue.go @@ -23,6 +23,8 @@ import ( "sync" "time" + uAtomic "go.uber.org/atomic" + "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" "github.com/pkg/errors" @@ -36,6 +38,7 @@ type zeroQueueConsumer struct { pc *partitionConsumer consumerName string disableForceTopicCreation bool + waitingOnReceive uAtomic.Bool messageCh chan ConsumerMessage @@ -71,11 +74,17 @@ func newZeroConsumer(client *client, options ConsumerOptions, topic string, return nil, err } opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options) - conn, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics) + opts.zeroQueueReconnectedPolicy = func(pc *partitionConsumer) { + if zc.waitingOnReceive.Load() { + pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits") + pc.availablePermits.inc() + } + } + pc, err := newPartitionConsumer(zc, zc.client, opts, zc.messageCh, zc.dlq, zc.metrics) if err != nil { return nil, err } - zc.pc = conn + zc.pc = pc return zc, nil } @@ -119,17 +128,26 @@ func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) { } z.Lock() defer z.Unlock() + z.waitingOnReceive.Store(true) z.pc.availablePermits.inc() for { select { case <-z.closeCh: + z.waitingOnReceive.Store(false) return nil, newError(ConsumerClosed, "consumer closed") case cm, ok := <-z.messageCh: if !ok { return nil, newError(ConsumerClosed, "consumer closed") } - return cm.Message, nil + message, ok := cm.Message.(*message) + if ok && message.getConn().ID() == z.pc._getConn().ID() { + z.waitingOnReceive.Store(false) + return cm.Message, nil + } else { + z.log.WithField("messageID", cm.Message.ID()).Warn("message from old connection discarded after reconnection") + } case <-ctx.Done(): + z.waitingOnReceive.Store(false) return nil, ctx.Err() } } diff --git a/pulsar/consumer_zero_queue_test.go b/pulsar/consumer_zero_queue_test.go index 444751a1..0b3e8760 100644 --- a/pulsar/consumer_zero_queue_test.go +++ b/pulsar/consumer_zero_queue_test.go @@ -243,6 +243,148 @@ func TestReconnectConsumer(t *testing.T) { defer c.Terminate(ctx) } +func TestReconnectedBrokerSendPermits(t *testing.T) { + req := testcontainers.ContainerRequest{ + Name: "pulsar-test", + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + HostConfigModifier: func(config *container.HostConfig) { + config.PortBindings = map[nat.Port][]nat.PortBinding{ + "6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6659"}}, + "8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8089"}}, + } + }, + Cmd: []string{"bin/pulsar", "standalone", "-nfw"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + Reuse: true, + }) + require.NoError(t, err, "Failed to start the pulsar container") + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err, "Failed to get the pulsar endpoint") + + sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + client, err := NewClient(ClientOptions{ + URL: endpoint, + Logger: plog.NewLoggerWithSlog(sLogger), + }) + assert.Nil(t, err) + adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", "http") + assert.Nil(t, err) + admin, err := pulsaradmin.NewClient(&config.Config{ + WebServiceURL: adminEndpoint, + }) + assert.Nil(t, err) + + topic := newTopicName() + var consumer Consumer + require.Eventually(t, func() bool { + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + EnableZeroQueueConsumer: true, + Type: Shared, // using Shared subscription type to support unack subscription stats + }) + return err == nil + }, 30*time.Second, 1*time.Second) + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + + // send 10 messages + for i := 0; i < 10; i++ { + msg, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + log.Printf("send message: %s", msg.String()) + } + + log.Println("unloading topic") + topicName, err := utils.GetTopicName(topic) + assert.Nil(t, err) + err = admin.Topics().Unload(*topicName) + assert.Nil(t, err) + log.Println("unloaded topic") + zc, ok := consumer.(*zeroQueueConsumer) + assert.True(t, ok) + // wait for reconnect + require.EventuallyWithT(t, func(c *assert.CollectT) { + reconnectCount := zc.pc.reconnectCount.Load() + require.Equal(c, reconnectCount, int32(1)) + }, 30*time.Second, 1*time.Second) + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + assert.Nil(t, err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + expectProperties := map[string]string{ + "key-1": "pulsar-1", + } + assert.Equal(t, []byte(expectMsg), msg.Payload()) + assert.Equal(t, "pulsar", msg.Key()) + assert.Equal(t, expectProperties, msg.Properties()) + // ack message + err = consumer.Ack(msg) + assert.Nil(t, err) + log.Printf("receive message: %s", msg.ID().String()) + } + // send one more message and we do not manually receive it + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", 10)), + Key: "pulsar", + Properties: map[string]string{ + "key-1": "pulsar-1", + }, + }) + assert.Nil(t, err) + // wait for broker send messages to consumer and topic stats update finish + option := utils.GetStatsOptions{ + GetPreciseBacklog: true, + } + require.EventuallyWithT(t, func(c *assert.CollectT) { + topicStats, err := admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option) + require.Nil(c, err) + for _, subscriptionStats := range topicStats.Subscriptions { + require.Equal(c, subscriptionStats.MsgBacklog, int64(1)) + require.Equal(c, subscriptionStats.Consumers[0].UnAckedMessages, 0) + } + }, 30*time.Second, 1*time.Second) + + // ack + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + err = consumer.Ack(msg) + assert.Nil(t, err) + + // check topic stats + require.EventuallyWithT(t, func(c *assert.CollectT) { + topicStats, err := admin.Topics().GetStatsWithOptionWithContext(ctx, *topicName, option) + require.Nil(c, err) + for _, subscriptionStats := range topicStats.Subscriptions { + require.Equal(c, subscriptionStats.MsgBacklog, int64(0)) + require.Equal(c, subscriptionStats.Consumers[0].UnAckedMessages, 0) + } + }, 30*time.Second, 1*time.Second) + +} + func TestUnloadTopicBeforeConsume(t *testing.T) { sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index f32cc7fc..4f314c3d 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/apache/pulsar-client-go/pulsar/internal" + "google.golang.org/protobuf/proto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" @@ -313,6 +315,7 @@ type message struct { encryptionContext *EncryptionContext index *uint64 brokerPublishTime *time.Time + conn internal.Connection } func (msg *message) Topic() string { @@ -394,6 +397,10 @@ func (msg *message) size() int { return len(msg.payLoad) } +func (msg *message) getConn() internal.Connection { + return msg.conn +} + func newAckTracker(size uint) *ackTracker { batchIDs := bitset.New(size) for i := uint(0); i < size; i++ {
