This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 06f26935 improve: use chan *message instead of chan []*message as 
queueCh (#1283)
06f26935 is described below

commit 06f2693583cca6855c5731f5268d32d1758240d3
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Oct 8 17:43:06 2024 +0800

    improve: use chan *message instead of chan []*message as queueCh (#1283)
    
    ### Motivation
    
    We currently use `chan []*message` as queueCh, and use the slice to stage 
the messages to send the message to the parent consumer, this will result in 
excessive use of memory.
    
    This PR optimizes potentially reducing overall memory overhead.
    
    
    ### Modifications
    
    - Use `chan *message` instead of `chan []*message`.
    - Fix test.
---
 pulsar/consumer_partition.go      | 101 +++++++++++++++++---------------------
 pulsar/consumer_partition_test.go |  41 +++++++++-------
 2 files changed, 67 insertions(+), 75 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 823e0e87..831f763a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -154,7 +154,7 @@ type partitionConsumer struct {
 
        // the size of the queue channel for buffering messages
        maxQueueSize    int32
-       queueCh         chan []*message
+       queueCh         chan *message
        startMessageID  atomicMessageID
        lastDequeuedMsg *trackingMessageID
 
@@ -338,7 +338,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                partitionIdx:         int32(options.partitionIdx),
                eventsCh:             make(chan interface{}, 10),
                maxQueueSize:         int32(options.receiverQueueSize),
-               queueCh:              make(chan []*message, 
options.receiverQueueSize),
+               queueCh:              make(chan *message, 
options.receiverQueueSize),
                startMessageID:       atomicMessageID{msgID: 
options.startMessageID},
                connectedCh:          make(chan struct{}),
                messageCh:            messageCh,
@@ -1057,37 +1057,33 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                        return fmt.Errorf("discarding message on decryption 
error :%v", err)
                case crypto.ConsumerCryptoFailureActionConsume:
                        pc.log.Warnf("consuming encrypted message due to error 
in decryption :%v", err)
-                       messages := []*message{
-                               {
-                                       publishTime:  
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-                                       eventTime:    
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-                                       key:          msgMeta.GetPartitionKey(),
-                                       producerName: msgMeta.GetProducerName(),
-                                       properties:   
internal.ConvertToStringMap(msgMeta.GetProperties()),
-                                       topic:        pc.topic,
-                                       msgID: newMessageID(
-                                               int64(pbMsgID.GetLedgerId()),
-                                               int64(pbMsgID.GetEntryId()),
-                                               pbMsgID.GetBatchIndex(),
-                                               pc.partitionIdx,
-                                               pbMsgID.GetBatchSize(),
-                                       ),
-                                       payLoad:             
headersAndPayload.ReadableSlice(),
-                                       schema:              pc.options.schema,
-                                       replicationClusters: 
msgMeta.GetReplicateTo(),
-                                       replicatedFrom:      
msgMeta.GetReplicatedFrom(),
-                                       redeliveryCount:     
response.GetRedeliveryCount(),
-                                       encryptionContext:   
createEncryptionContext(msgMeta),
-                                       orderingKey:         
string(msgMeta.OrderingKey),
-                               },
-                       }
-
                        if pc.options.autoReceiverQueueSize {
                                pc.incomingMessages.Inc()
                                pc.markScaleIfNeed()
                        }
 
-                       pc.queueCh <- messages
+                       pc.queueCh <- &message{
+                               publishTime:  
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:    
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+                               key:          msgMeta.GetPartitionKey(),
+                               producerName: msgMeta.GetProducerName(),
+                               properties:   
internal.ConvertToStringMap(msgMeta.GetProperties()),
+                               topic:        pc.topic,
+                               msgID: newMessageID(
+                                       int64(pbMsgID.GetLedgerId()),
+                                       int64(pbMsgID.GetEntryId()),
+                                       pbMsgID.GetBatchIndex(),
+                                       pc.partitionIdx,
+                                       pbMsgID.GetBatchSize(),
+                               ),
+                               payLoad:             
headersAndPayload.ReadableSlice(),
+                               schema:              pc.options.schema,
+                               replicationClusters: msgMeta.GetReplicateTo(),
+                               replicatedFrom:      
msgMeta.GetReplicatedFrom(),
+                               redeliveryCount:     
response.GetRedeliveryCount(),
+                               encryptionContext:   
createEncryptionContext(msgMeta),
+                               orderingKey:         
string(msgMeta.OrderingKey),
+                       }
                        return nil
                }
        }
@@ -1123,7 +1119,6 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                numMsgs = int(msgMeta.GetNumMessagesInBatch())
        }
 
-       messages := make([]*message, 0)
        var ackTracker *ackTracker
        // are there multiple messages in this batch?
        if numMsgs > 1 {
@@ -1144,7 +1139,6 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        pc.metrics.PrefetchedMessages.Add(float64(numMsgs))
 
        var (
-               bytesReceived   int
                skippedMessages int32
        )
        for i := 0; i < numMsgs; i++ {
@@ -1264,22 +1258,19 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                        Message:  msg,
                })
 
-               messages = append(messages, msg)
-               bytesReceived += msg.size()
-       }
+               if pc.options.autoReceiverQueueSize {
+                       pc.client.memLimit.ForceReserveMemory(int64(msg.size()))
+                       pc.incomingMessages.Add(int32(1))
+                       pc.markScaleIfNeed()
+               }
 
-       if pc.options.autoReceiverQueueSize {
-               pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
-               pc.incomingMessages.Add(int32(len(messages)))
-               pc.markScaleIfNeed()
+               pc.queueCh <- msg
        }
 
        if skippedMessages > 0 {
                pc.availablePermits.add(skippedMessages)
        }
 
-       // send messages to the dispatcher
-       pc.queueCh <- messages
        return nil
 }
 
@@ -1435,20 +1426,19 @@ func (pc *partitionConsumer) dispatcher() {
        defer func() {
                pc.log.Debug("exiting dispatch loop")
        }()
-       var messages []*message
+       var queueMsg *message
        for {
-               var queueCh chan []*message
+               var queueCh chan *message
                var messageCh chan ConsumerMessage
                var nextMessage ConsumerMessage
                var nextMessageSize int
 
-               // are there more messages to send?
-               if len(messages) > 0 {
+               if queueMsg != nil {
                        nextMessage = ConsumerMessage{
                                Consumer: pc.parentConsumer,
-                               Message:  messages[0],
+                               Message:  queueMsg,
                        }
-                       nextMessageSize = messages[0].size()
+                       nextMessageSize = queueMsg.size()
 
                        if pc.dlq.shouldSendToDlq(&nextMessage) {
                                // pass the message to the DLQ router
@@ -1460,7 +1450,7 @@ func (pc *partitionConsumer) dispatcher() {
                        }
 
                        pc.metrics.PrefetchedMessages.Dec()
-                       
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
+                       
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
                } else {
                        queueCh = pc.queueCh
                }
@@ -1475,7 +1465,7 @@ func (pc *partitionConsumer) dispatcher() {
                        }
                        pc.log.Debug("dispatcher received connection event")
 
-                       messages = nil
+                       queueMsg = nil
 
                        // reset available permits
                        pc.availablePermits.reset()
@@ -1493,19 +1483,16 @@ func (pc *partitionConsumer) dispatcher() {
                                pc.log.WithError(err).Error("unable to send 
initial permits to broker")
                        }
 
-               case msgs, ok := <-queueCh:
+               case msg, ok := <-queueCh:
                        if !ok {
                                return
                        }
-                       // we only read messages here after the consumer has 
processed all messages
-                       // in the previous batch
-                       messages = msgs
+
+                       queueMsg = msg
 
                // if the messageCh is nil or the messageCh is full this will 
not be selected
                case messageCh <- nextMessage:
-                       // allow this message to be garbage collected
-                       messages[0] = nil
-                       messages = messages[1:]
+                       queueMsg = nil
 
                        pc.availablePermits.inc()
 
@@ -1528,14 +1515,14 @@ func (pc *partitionConsumer) dispatcher() {
                                if m == nil {
                                        break
                                } else if nextMessageInQueue == nil {
-                                       nextMessageInQueue = 
toTrackingMessageID(m[0].msgID)
+                                       nextMessageInQueue = 
toTrackingMessageID(m.msgID)
                                }
                                if pc.options.autoReceiverQueueSize {
-                                       pc.incomingMessages.Sub(int32(len(m)))
+                                       pc.incomingMessages.Sub(int32(1))
                                }
                        }
 
-                       messages = nil
+                       queueMsg = nil
 
                        clearQueueCb(nextMessageInQueue)
                }
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 21280fb7..746e17ac 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -30,7 +30,7 @@ import (
 func TestSingleMessageIDNoAckTracker(t *testing.T) {
        eventsCh := make(chan interface{}, 1)
        pc := partitionConsumer{
-               queueCh:              make(chan []*message, 1),
+               queueCh:              make(chan *message, 1),
                eventsCh:             eventsCh,
                compressionProviders: sync.Map{},
                options:              &partitionConsumerOpts{},
@@ -47,13 +47,12 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
        }
 
        // ensure the tracker was set on the message id
-       messages := <-pc.queueCh
-       for _, m := range messages {
-               assert.Nil(t, m.ID().(*trackingMessageID).tracker)
-       }
+       message := <-pc.queueCh
+       id := message.ID().(*trackingMessageID)
+       assert.Nil(t, id.tracker)
 
        // ack the message id
-       pc.AckID(messages[0].msgID.(*trackingMessageID))
+       pc.AckID(id)
 
        select {
        case <-eventsCh:
@@ -69,7 +68,7 @@ func newTestMetrics() *internal.LeveledMetrics {
 func TestBatchMessageIDNoAckTracker(t *testing.T) {
        eventsCh := make(chan interface{}, 1)
        pc := partitionConsumer{
-               queueCh:              make(chan []*message, 1),
+               queueCh:              make(chan *message, 1),
                eventsCh:             eventsCh,
                compressionProviders: sync.Map{},
                options:              &partitionConsumerOpts{},
@@ -86,13 +85,12 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
        }
 
        // ensure the tracker was set on the message id
-       messages := <-pc.queueCh
-       for _, m := range messages {
-               assert.Nil(t, m.ID().(*trackingMessageID).tracker)
-       }
+       message := <-pc.queueCh
+       id := message.ID().(*trackingMessageID)
+       assert.Nil(t, id.tracker)
 
        // ack the message id
-       err := pc.AckID(messages[0].msgID.(*trackingMessageID))
+       err := pc.AckID(id)
        assert.Nil(t, err)
 
        select {
@@ -105,7 +103,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
 func TestBatchMessageIDWithAckTracker(t *testing.T) {
        eventsCh := make(chan interface{}, 1)
        pc := partitionConsumer{
-               queueCh:              make(chan []*message, 1),
+               queueCh:              make(chan *message, 10),
                eventsCh:             eventsCh,
                compressionProviders: sync.Map{},
                options:              &partitionConsumerOpts{},
@@ -122,14 +120,21 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
        }
 
        // ensure the tracker was set on the message id
-       messages := <-pc.queueCh
-       for _, m := range messages {
-               assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
+       var messageIDs []*trackingMessageID
+       for i := 0; i < 10; i++ {
+               select {
+               case m := <-pc.queueCh:
+                       id := m.ID().(*trackingMessageID)
+                       assert.NotNil(t, id.tracker)
+                       messageIDs = append(messageIDs, id)
+               default:
+                       break
+               }
        }
 
        // ack all message ids except the last one
        for i := 0; i < 9; i++ {
-               err := pc.AckID(messages[i].msgID.(*trackingMessageID))
+               err := pc.AckID(messageIDs[i])
                assert.Nil(t, err)
        }
 
@@ -140,7 +145,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
        }
 
        // ack last message
-       err := pc.AckID(messages[9].msgID.(*trackingMessageID))
+       err := pc.AckID(messageIDs[9])
        assert.Nil(t, err)
 
        select {

Reply via email to