This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 06f26935 improve: use chan *message instead of chan []*message as
queueCh (#1283)
06f26935 is described below
commit 06f2693583cca6855c5731f5268d32d1758240d3
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Oct 8 17:43:06 2024 +0800
improve: use chan *message instead of chan []*message as queueCh (#1283)
### Motivation
We currently use `chan []*message` as queueCh, and use the slice to stage
the messages to send the message to the parent consumer, this will result in
excessive use of memory.
This PR optimizes potentially reducing overall memory overhead.
### Modifications
- Use `chan *message` instead of `chan []*message`.
- Fix test.
---
pulsar/consumer_partition.go | 101 +++++++++++++++++---------------------
pulsar/consumer_partition_test.go | 41 +++++++++-------
2 files changed, 67 insertions(+), 75 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 823e0e87..831f763a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -154,7 +154,7 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
maxQueueSize int32
- queueCh chan []*message
+ queueCh chan *message
startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID
@@ -338,7 +338,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
- queueCh: make(chan []*message,
options.receiverQueueSize),
+ queueCh: make(chan *message,
options.receiverQueueSize),
startMessageID: atomicMessageID{msgID:
options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
@@ -1057,37 +1057,33 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
return fmt.Errorf("discarding message on decryption
error :%v", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error
in decryption :%v", err)
- messages := []*message{
- {
- publishTime:
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
- eventTime:
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
- key: msgMeta.GetPartitionKey(),
- producerName: msgMeta.GetProducerName(),
- properties:
internal.ConvertToStringMap(msgMeta.GetProperties()),
- topic: pc.topic,
- msgID: newMessageID(
- int64(pbMsgID.GetLedgerId()),
- int64(pbMsgID.GetEntryId()),
- pbMsgID.GetBatchIndex(),
- pc.partitionIdx,
- pbMsgID.GetBatchSize(),
- ),
- payLoad:
headersAndPayload.ReadableSlice(),
- schema: pc.options.schema,
- replicationClusters:
msgMeta.GetReplicateTo(),
- replicatedFrom:
msgMeta.GetReplicatedFrom(),
- redeliveryCount:
response.GetRedeliveryCount(),
- encryptionContext:
createEncryptionContext(msgMeta),
- orderingKey:
string(msgMeta.OrderingKey),
- },
- }
-
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}
- pc.queueCh <- messages
+ pc.queueCh <- &message{
+ publishTime:
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime:
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+ key: msgMeta.GetPartitionKey(),
+ producerName: msgMeta.GetProducerName(),
+ properties:
internal.ConvertToStringMap(msgMeta.GetProperties()),
+ topic: pc.topic,
+ msgID: newMessageID(
+ int64(pbMsgID.GetLedgerId()),
+ int64(pbMsgID.GetEntryId()),
+ pbMsgID.GetBatchIndex(),
+ pc.partitionIdx,
+ pbMsgID.GetBatchSize(),
+ ),
+ payLoad:
headersAndPayload.ReadableSlice(),
+ schema: pc.options.schema,
+ replicationClusters: msgMeta.GetReplicateTo(),
+ replicatedFrom:
msgMeta.GetReplicatedFrom(),
+ redeliveryCount:
response.GetRedeliveryCount(),
+ encryptionContext:
createEncryptionContext(msgMeta),
+ orderingKey:
string(msgMeta.OrderingKey),
+ }
return nil
}
}
@@ -1123,7 +1119,6 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
numMsgs = int(msgMeta.GetNumMessagesInBatch())
}
- messages := make([]*message, 0)
var ackTracker *ackTracker
// are there multiple messages in this batch?
if numMsgs > 1 {
@@ -1144,7 +1139,6 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))
var (
- bytesReceived int
skippedMessages int32
)
for i := 0; i < numMsgs; i++ {
@@ -1264,22 +1258,19 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
Message: msg,
})
- messages = append(messages, msg)
- bytesReceived += msg.size()
- }
+ if pc.options.autoReceiverQueueSize {
+ pc.client.memLimit.ForceReserveMemory(int64(msg.size()))
+ pc.incomingMessages.Add(int32(1))
+ pc.markScaleIfNeed()
+ }
- if pc.options.autoReceiverQueueSize {
- pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
- pc.incomingMessages.Add(int32(len(messages)))
- pc.markScaleIfNeed()
+ pc.queueCh <- msg
}
if skippedMessages > 0 {
pc.availablePermits.add(skippedMessages)
}
- // send messages to the dispatcher
- pc.queueCh <- messages
return nil
}
@@ -1435,20 +1426,19 @@ func (pc *partitionConsumer) dispatcher() {
defer func() {
pc.log.Debug("exiting dispatch loop")
}()
- var messages []*message
+ var queueMsg *message
for {
- var queueCh chan []*message
+ var queueCh chan *message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
var nextMessageSize int
- // are there more messages to send?
- if len(messages) > 0 {
+ if queueMsg != nil {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
- Message: messages[0],
+ Message: queueMsg,
}
- nextMessageSize = messages[0].size()
+ nextMessageSize = queueMsg.size()
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
@@ -1460,7 +1450,7 @@ func (pc *partitionConsumer) dispatcher() {
}
pc.metrics.PrefetchedMessages.Dec()
-
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
+
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
} else {
queueCh = pc.queueCh
}
@@ -1475,7 +1465,7 @@ func (pc *partitionConsumer) dispatcher() {
}
pc.log.Debug("dispatcher received connection event")
- messages = nil
+ queueMsg = nil
// reset available permits
pc.availablePermits.reset()
@@ -1493,19 +1483,16 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.WithError(err).Error("unable to send
initial permits to broker")
}
- case msgs, ok := <-queueCh:
+ case msg, ok := <-queueCh:
if !ok {
return
}
- // we only read messages here after the consumer has
processed all messages
- // in the previous batch
- messages = msgs
+
+ queueMsg = msg
// if the messageCh is nil or the messageCh is full this will
not be selected
case messageCh <- nextMessage:
- // allow this message to be garbage collected
- messages[0] = nil
- messages = messages[1:]
+ queueMsg = nil
pc.availablePermits.inc()
@@ -1528,14 +1515,14 @@ func (pc *partitionConsumer) dispatcher() {
if m == nil {
break
} else if nextMessageInQueue == nil {
- nextMessageInQueue =
toTrackingMessageID(m[0].msgID)
+ nextMessageInQueue =
toTrackingMessageID(m.msgID)
}
if pc.options.autoReceiverQueueSize {
- pc.incomingMessages.Sub(int32(len(m)))
+ pc.incomingMessages.Sub(int32(1))
}
}
- messages = nil
+ queueMsg = nil
clearQueueCb(nextMessageInQueue)
}
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index 21280fb7..746e17ac 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -30,7 +30,7 @@ import (
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
- queueCh: make(chan []*message, 1),
+ queueCh: make(chan *message, 1),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
@@ -47,13 +47,12 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
}
// ensure the tracker was set on the message id
- messages := <-pc.queueCh
- for _, m := range messages {
- assert.Nil(t, m.ID().(*trackingMessageID).tracker)
- }
+ message := <-pc.queueCh
+ id := message.ID().(*trackingMessageID)
+ assert.Nil(t, id.tracker)
// ack the message id
- pc.AckID(messages[0].msgID.(*trackingMessageID))
+ pc.AckID(id)
select {
case <-eventsCh:
@@ -69,7 +68,7 @@ func newTestMetrics() *internal.LeveledMetrics {
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
- queueCh: make(chan []*message, 1),
+ queueCh: make(chan *message, 1),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
@@ -86,13 +85,12 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
}
// ensure the tracker was set on the message id
- messages := <-pc.queueCh
- for _, m := range messages {
- assert.Nil(t, m.ID().(*trackingMessageID).tracker)
- }
+ message := <-pc.queueCh
+ id := message.ID().(*trackingMessageID)
+ assert.Nil(t, id.tracker)
// ack the message id
- err := pc.AckID(messages[0].msgID.(*trackingMessageID))
+ err := pc.AckID(id)
assert.Nil(t, err)
select {
@@ -105,7 +103,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
- queueCh: make(chan []*message, 1),
+ queueCh: make(chan *message, 10),
eventsCh: eventsCh,
compressionProviders: sync.Map{},
options: &partitionConsumerOpts{},
@@ -122,14 +120,21 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}
// ensure the tracker was set on the message id
- messages := <-pc.queueCh
- for _, m := range messages {
- assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
+ var messageIDs []*trackingMessageID
+ for i := 0; i < 10; i++ {
+ select {
+ case m := <-pc.queueCh:
+ id := m.ID().(*trackingMessageID)
+ assert.NotNil(t, id.tracker)
+ messageIDs = append(messageIDs, id)
+ default:
+ break
+ }
}
// ack all message ids except the last one
for i := 0; i < 9; i++ {
- err := pc.AckID(messages[i].msgID.(*trackingMessageID))
+ err := pc.AckID(messageIDs[i])
assert.Nil(t, err)
}
@@ -140,7 +145,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}
// ack last message
- err := pc.AckID(messages[9].msgID.(*trackingMessageID))
+ err := pc.AckID(messageIDs[9])
assert.Nil(t, err)
select {