This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 28f61d2  [fix] [issue 1051]: Fix inaccurate producer mem limit in 
chunking and schema (#1055)
28f61d2 is described below

commit 28f61d2f9e00279028e4c6e3558e5aa2665d25cd
Author: Jiaqi Shen <[email protected]>
AuthorDate: Thu Jul 20 15:51:41 2023 +0800

    [fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and 
schema (#1055)
    
    ### Motivation
    
    The producer memory limit have some problem when `EnableChunking=true` or 
`Schema` is set.
    - When `Schema` is set, the actual message payload is `msg.Value`. The 
`len(msg.Payload)` may be 0 and memory can not be reserved acurate.
    
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L479-L494
    
    - In chunking, if producer meets the memory limit, it should release the 
memory for **chunks which has send out**. But the calculate for this release is 
not accurate, it should be `uncompressedPayloadSize - int64(lhs)` instead of 
`uncompressedPayloadSize - int64(rhs)`
    
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L662-L664
    
    - In chunking, if `internalSingleSend` is failed, it should release the 
memory for **single chunk**. But we release all the chunks memory repeatly now.
    
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L838-L843
    
    - When producer received the receipt from broker, it should release the 
memory **it reserved before sending**. But it releases wrong size in `chunking` 
and `schema`.
    
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L1221-L1230
    
    ### Modifications
    
    - Fix all the memory limit problems relative to `chunking` and `schema`
    - Add unit tests to cover these scenarios
    
    ---------
    
    Co-authored-by: shenjiaqi.2769 <[email protected]>
---
 pulsar/producer_partition.go |  55 +++++++++------
 pulsar/producer_test.go      | 159 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 192 insertions(+), 22 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e74fd98..5daf54c 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -477,21 +477,19 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        // read payload from message
        uncompressedPayload := msg.Payload
-       uncompressedPayloadSize := int64(len(uncompressedPayload))
 
        var schemaPayload []byte
        var err error
 
        // The block chan must be closed when returned with exception
        defer request.stopBlock()
-       if !p.canAddToQueue(request, uncompressedPayloadSize) {
+       if !p.canAddToQueue(request) {
                return
        }
 
        if p.options.DisableMultiSchema {
                if msg.Schema != nil && p.options.Schema != nil &&
                        msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
-                       p.releaseSemaphoreAndMem(uncompressedPayloadSize)
                        runCallback(request.callback, nil, request.msg, 
fmt.Errorf("msg schema can not match with producer schema"))
                        p.log.WithError(err).Errorf("The producer %s of the 
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
                        return
@@ -510,7 +508,6 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                if uncompressedPayload == nil && schema != nil {
                        schemaPayload, err = schema.Encode(msg.Value)
                        if err != nil {
-                               
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
                                runCallback(request.callback, nil, request.msg, 
newError(SchemaFailure, err.Error()))
                                p.log.WithError(err).Errorf("Schema encode 
message failed %s", msg.Value)
                                return
@@ -526,7 +523,6 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                if schemaVersion == nil {
                        schemaVersion, err = 
p.getOrCreateSchema(schema.GetSchemaInfo())
                        if err != nil {
-                               
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
                                p.log.WithError(err).Error("get schema version 
fail")
                                runCallback(request.callback, nil, request.msg, 
fmt.Errorf("get schema version fail, err: %w", err))
                                return
@@ -537,6 +533,11 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        uncompressedSize := len(uncompressedPayload)
 
+       // try to reserve memory for uncompressedPayload
+       if !p.canReserveMem(request, int64(uncompressedSize)) {
+               return
+       }
+
        deliverAt := msg.DeliverAt
        if msg.DeliverAfter.Nanoseconds() > 0 {
                deliverAt = time.Now().Add(msg.DeliverAfter)
@@ -586,7 +587,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        // if msg is too large and chunking is disabled
        if checkSize > maxMessageSize && !p.options.EnableChunking {
-               p.releaseSemaphoreAndMem(uncompressedPayloadSize)
+               p.releaseSemaphoreAndMem(int64(uncompressedSize))
                runCallback(request.callback, nil, request.msg, 
errMessageTooLarge)
                p.log.WithError(errMessageTooLarge).
                        WithField("size", checkSize).
@@ -605,7 +606,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        } else {
                payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - 
proto.Size(mm)
                if payloadChunkSize <= 0 {
-                       p.releaseSemaphoreAndMem(uncompressedPayloadSize)
+                       p.releaseSemaphoreAndMem(int64(uncompressedSize))
                        runCallback(request.callback, nil, msg, errMetaTooLarge)
                        p.log.WithError(errMetaTooLarge).
                                WithField("metadata size", proto.Size(mm)).
@@ -652,10 +653,11 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                                        uuid:             uuid,
                                        chunkRecorder:    cr,
                                        transaction:      request.transaction,
+                                       reservedMem:      int64(rhs - lhs),
                                }
                                // the permit of first chunk has acquired
-                               if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
-                                       
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))
+                               if chunkID != 0 && !p.canAddToQueue(nsr) {
+                                       
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
                                        return
                                }
                                p.internalSingleSend(mm, 
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
@@ -680,7 +682,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                        // after flushing try again to add the current payload
                        if ok := addRequestToBatch(smm, p, uncompressedPayload, 
request, msg, deliverAt, schemaVersion,
                                multiSchemaEnabled); !ok {
-                               
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
+                               
p.releaseSemaphoreAndMem(int64(uncompressedSize))
                                runCallback(request.callback, nil, request.msg, 
errFailAddToBatch)
                                p.log.WithField("size", uncompressedSize).
                                        WithField("properties", msg.Properties).
@@ -832,7 +834,7 @@ func (p *partitionProducer) internalSingleSend(mm 
*pb.MessageMetadata,
 
        if err != nil {
                runCallback(request.callback, nil, request.msg, err)
-               p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
+               p.releaseSemaphoreAndMem(request.reservedMem)
                p.log.WithError(err).Errorf("Single message serialize failed 
%s", msg.Value)
                return
        }
@@ -971,7 +973,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                                sr := i.(*sendRequest)
                                if sr.msg != nil {
                                        size := len(sr.msg.Payload)
-                                       p.releaseSemaphoreAndMem(int64(size))
+                                       p.releaseSemaphoreAndMem(sr.reservedMem)
                                        p.metrics.MessagesPending.Dec()
                                        
p.metrics.BytesPending.Sub(float64(size))
                                        p.metrics.PublishErrorsTimeout.Inc()
@@ -1208,7 +1210,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                for idx, i := range pi.sendRequests {
                        sr := i.(*sendRequest)
                        atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
-                       p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
+                       p.releaseSemaphoreAndMem(sr.reservedMem)
                        
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
                        p.metrics.MessagesPublished.Inc()
                        p.metrics.MessagesPending.Dec()
@@ -1352,6 +1354,7 @@ type sendRequest struct {
        uuid             string
        chunkRecorder    *chunkRecorder
        transaction      *transaction
+       reservedMem      int64
 }
 
 // stopBlock can be invoked multiple times safety
@@ -1401,31 +1404,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size 
int64) {
        p.client.memLimit.ReleaseMemory(size)
 }
 
-func (p *partitionProducer) canAddToQueue(sr *sendRequest, 
uncompressedPayloadSize int64) bool {
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
        if p.options.DisableBlockIfQueueFull {
                if !p.publishSemaphore.TryAcquire() {
                        runCallback(sr.callback, nil, sr.msg, 
errSendQueueIsFull)
                        return false
                }
-               if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) 
{
+       } else {
+               if !p.publishSemaphore.Acquire(sr.ctx) {
+                       runCallback(sr.callback, nil, sr.msg, errContextExpired)
+                       return false
+               }
+       }
+       p.metrics.MessagesPending.Inc()
+       return true
+}
+
+func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
+       if p.options.DisableBlockIfQueueFull {
+               if !p.client.memLimit.TryReserveMemory(size) {
                        p.publishSemaphore.Release()
                        runCallback(sr.callback, nil, sr.msg, 
errMemoryBufferIsFull)
                        return false
                }
 
        } else {
-               if !p.publishSemaphore.Acquire(sr.ctx) {
-                       runCallback(sr.callback, nil, sr.msg, errContextExpired)
-                       return false
-               }
-               if !p.client.memLimit.ReserveMemory(sr.ctx, 
uncompressedPayloadSize) {
+               if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
                        p.publishSemaphore.Release()
                        runCallback(sr.callback, nil, sr.msg, errContextExpired)
                        return false
                }
        }
-       p.metrics.MessagesPending.Inc()
-       p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
+       sr.reservedMem += size
+       p.metrics.BytesPending.Add(float64(size))
        return true
 }
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index adbdc71..be9885f 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1939,6 +1939,165 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
        assert.NoError(t, err)
 }
 
+func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
+
+       c, err := NewClient(ClientOptions{
+               URL:              serviceURL,
+               MemoryLimitBytes: 100 * 6,
+       })
+       assert.NoError(t, err)
+       defer c.Close()
+
+       schema := NewAvroSchema(`{"fields":
+       [
+               
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+       ],
+       "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, 
nil)
+
+       topicName := newTopicName()
+       producer1, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         false,
+               BatchingMaxPublishDelay: 100 * time.Second,
+               SendTimeout:             2 * time.Second,
+       })
+
+       producer2, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         false,
+               BatchingMaxPublishDelay: 100 * time.Second,
+               SendTimeout:             2 * time.Second,
+       })
+
+       // the size of encoded value is 6 bytes
+       value := map[string]interface{}{
+               "id": 0,
+               "name": map[string]interface{}{
+                       "string": "abc",
+               },
+       }
+
+       n := 101
+       for i := 0; i < n/2; i++ {
+               producer1.SendAsync(context.Background(), &ProducerMessage{
+                       Value:  value,
+                       Schema: schema,
+               }, func(id MessageID, message *ProducerMessage, e error) {})
+
+               producer2.SendAsync(context.Background(), &ProducerMessage{
+                       Value:  value,
+                       Schema: schema,
+               }, func(id MessageID, message *ProducerMessage, e error) {})
+       }
+       // Last message in order to reach the limit
+       producer1.SendAsync(context.Background(), &ProducerMessage{
+               Value:  value,
+               Schema: schema,
+       }, func(id MessageID, message *ProducerMessage, e error) {})
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())
+
+       _, err = producer1.Send(context.Background(), &ProducerMessage{
+               Value:  value,
+               Schema: schema,
+       })
+       assert.Error(t, err)
+       assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+       _, err = producer2.Send(context.Background(), &ProducerMessage{
+               Value:  value,
+               Schema: schema,
+       })
+       assert.Error(t, err)
+       assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+       // flush pending msg
+       err = producer1.Flush()
+       assert.NoError(t, err)
+       err = producer2.Flush()
+       assert.NoError(t, err)
+       assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
+
+       _, err = producer1.Send(context.Background(), &ProducerMessage{
+               Value:  value,
+               Schema: schema,
+       })
+       assert.NoError(t, err)
+       _, err = producer2.Send(context.Background(), &ProducerMessage{
+               Value:  value,
+               Schema: schema,
+       })
+       assert.NoError(t, err)
+}
+
+func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
+
+       c, err := NewClient(ClientOptions{
+               URL:              serviceURL,
+               MemoryLimitBytes: 5 * 1024,
+       })
+       assert.NoError(t, err)
+       defer c.Close()
+
+       topicName := newTopicName()
+       producer1, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         true,
+               EnableChunking:          true,
+               SendTimeout:             2 * time.Second,
+       })
+
+       producer2, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         false,
+               BatchingMaxPublishDelay: 100 * time.Millisecond,
+               SendTimeout:             2 * time.Second,
+       })
+
+       producer2.SendAsync(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 5*1024+1),
+       }, func(id MessageID, message *ProducerMessage, e error) {
+               if e != nil {
+                       t.Fatal(e)
+               }
+       })
+
+       time.Sleep(50 * time.Millisecond)
+       assert.Equal(t, int64(5*1024+1), c.(*client).memLimit.CurrentUsage())
+
+       _, err = producer1.Send(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 1),
+       })
+       assert.Error(t, err)
+       assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+       // wait all the mem have been released
+       retryAssert(t, 10, 200, func() {}, func(t assert.TestingT) bool {
+               return assert.Equal(t, 0, 
int(c.(*client).memLimit.CurrentUsage()))
+       })
+
+       producer3, _ := c.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBlockIfQueueFull: true,
+               DisableBatching:         true,
+               EnableChunking:          true,
+               MaxPendingMessages:      1,
+               ChunkMaxMessageSize:     1024,
+               SendTimeout:             2 * time.Second,
+       })
+
+       // producer2 will reserve 2*1024 bytes and then release 1024 byte 
(release the second chunk)
+       // because it reaches MaxPendingMessages in chunking
+       _, _ = producer3.Send(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 2*1024),
+       })
+       assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
+}
+
 func TestMemLimitContextCancel(t *testing.T) {
 
        c, err := NewClient(ClientOptions{

Reply via email to