This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 28f61d2 [fix] [issue 1051]: Fix inaccurate producer mem limit in
chunking and schema (#1055)
28f61d2 is described below
commit 28f61d2f9e00279028e4c6e3558e5aa2665d25cd
Author: Jiaqi Shen <[email protected]>
AuthorDate: Thu Jul 20 15:51:41 2023 +0800
[fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and
schema (#1055)
### Motivation
The producer memory limit have some problem when `EnableChunking=true` or
`Schema` is set.
- When `Schema` is set, the actual message payload is `msg.Value`. The
`len(msg.Payload)` may be 0 and memory can not be reserved acurate.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L479-L494
- In chunking, if producer meets the memory limit, it should release the
memory for **chunks which has send out**. But the calculate for this release is
not accurate, it should be `uncompressedPayloadSize - int64(lhs)` instead of
`uncompressedPayloadSize - int64(rhs)`
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L662-L664
- In chunking, if `internalSingleSend` is failed, it should release the
memory for **single chunk**. But we release all the chunks memory repeatly now.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L838-L843
- When producer received the receipt from broker, it should release the
memory **it reserved before sending**. But it releases wrong size in `chunking`
and `schema`.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L1221-L1230
### Modifications
- Fix all the memory limit problems relative to `chunking` and `schema`
- Add unit tests to cover these scenarios
---------
Co-authored-by: shenjiaqi.2769 <[email protected]>
---
pulsar/producer_partition.go | 55 +++++++++------
pulsar/producer_test.go | 159 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 192 insertions(+), 22 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e74fd98..5daf54c 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -477,21 +477,19 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
// read payload from message
uncompressedPayload := msg.Payload
- uncompressedPayloadSize := int64(len(uncompressedPayload))
var schemaPayload []byte
var err error
// The block chan must be closed when returned with exception
defer request.stopBlock()
- if !p.canAddToQueue(request, uncompressedPayloadSize) {
+ if !p.canAddToQueue(request) {
return
}
if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() !=
p.options.Schema.GetSchemaInfo().hash() {
- p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg,
fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
@@ -510,7 +508,6 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
if uncompressedPayload == nil && schema != nil {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
-
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg,
newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode
message failed %s", msg.Value)
return
@@ -526,7 +523,6 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
if schemaVersion == nil {
schemaVersion, err =
p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
-
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version
fail")
runCallback(request.callback, nil, request.msg,
fmt.Errorf("get schema version fail, err: %w", err))
return
@@ -537,6 +533,11 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
uncompressedSize := len(uncompressedPayload)
+ // try to reserve memory for uncompressedPayload
+ if !p.canReserveMem(request, int64(uncompressedSize)) {
+ return
+ }
+
deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
@@ -586,7 +587,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
- p.releaseSemaphoreAndMem(uncompressedPayloadSize)
+ p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg,
errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
@@ -605,7 +606,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) -
proto.Size(mm)
if payloadChunkSize <= 0 {
- p.releaseSemaphoreAndMem(uncompressedPayloadSize)
+ p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
@@ -652,10 +653,11 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
+ reservedMem: int64(rhs - lhs),
}
// the permit of first chunk has acquired
- if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
-
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))
+ if chunkID != 0 && !p.canAddToQueue(nsr) {
+
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
return
}
p.internalSingleSend(mm,
compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
@@ -680,7 +682,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload,
request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
-
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
+
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg,
errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
@@ -832,7 +834,7 @@ func (p *partitionProducer) internalSingleSend(mm
*pb.MessageMetadata,
if err != nil {
runCallback(request.callback, nil, request.msg, err)
- p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
+ p.releaseSemaphoreAndMem(request.reservedMem)
p.log.WithError(err).Errorf("Single message serialize failed
%s", msg.Value)
return
}
@@ -971,7 +973,7 @@ func (p *partitionProducer) failTimeoutMessages() {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
- p.releaseSemaphoreAndMem(int64(size))
+ p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
@@ -1208,7 +1210,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
- p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
+ p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
@@ -1352,6 +1354,7 @@ type sendRequest struct {
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
+ reservedMem int64
}
// stopBlock can be invoked multiple times safety
@@ -1401,31 +1404,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size
int64) {
p.client.memLimit.ReleaseMemory(size)
}
-func (p *partitionProducer) canAddToQueue(sr *sendRequest,
uncompressedPayloadSize int64) bool {
+func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg,
errSendQueueIsFull)
return false
}
- if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize)
{
+ } else {
+ if !p.publishSemaphore.Acquire(sr.ctx) {
+ runCallback(sr.callback, nil, sr.msg, errContextExpired)
+ return false
+ }
+ }
+ p.metrics.MessagesPending.Inc()
+ return true
+}
+
+func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
+ if p.options.DisableBlockIfQueueFull {
+ if !p.client.memLimit.TryReserveMemory(size) {
p.publishSemaphore.Release()
runCallback(sr.callback, nil, sr.msg,
errMemoryBufferIsFull)
return false
}
} else {
- if !p.publishSemaphore.Acquire(sr.ctx) {
- runCallback(sr.callback, nil, sr.msg, errContextExpired)
- return false
- }
- if !p.client.memLimit.ReserveMemory(sr.ctx,
uncompressedPayloadSize) {
+ if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
p.publishSemaphore.Release()
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
}
- p.metrics.MessagesPending.Inc()
- p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
+ sr.reservedMem += size
+ p.metrics.BytesPending.Add(float64(size))
return true
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index adbdc71..be9885f 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1939,6 +1939,165 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.NoError(t, err)
}
+func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
+
+ c, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MemoryLimitBytes: 100 * 6,
+ })
+ assert.NoError(t, err)
+ defer c.Close()
+
+ schema := NewAvroSchema(`{"fields":
+ [
+
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
+ ],
+ "name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`,
nil)
+
+ topicName := newTopicName()
+ producer1, _ := c.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBlockIfQueueFull: true,
+ DisableBatching: false,
+ BatchingMaxPublishDelay: 100 * time.Second,
+ SendTimeout: 2 * time.Second,
+ })
+
+ producer2, _ := c.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBlockIfQueueFull: true,
+ DisableBatching: false,
+ BatchingMaxPublishDelay: 100 * time.Second,
+ SendTimeout: 2 * time.Second,
+ })
+
+ // the size of encoded value is 6 bytes
+ value := map[string]interface{}{
+ "id": 0,
+ "name": map[string]interface{}{
+ "string": "abc",
+ },
+ }
+
+ n := 101
+ for i := 0; i < n/2; i++ {
+ producer1.SendAsync(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ }, func(id MessageID, message *ProducerMessage, e error) {})
+
+ producer2.SendAsync(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ }, func(id MessageID, message *ProducerMessage, e error) {})
+ }
+ // Last message in order to reach the limit
+ producer1.SendAsync(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ }, func(id MessageID, message *ProducerMessage, e error) {})
+ time.Sleep(100 * time.Millisecond)
+ assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())
+
+ _, err = producer1.Send(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ })
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+ _, err = producer2.Send(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ })
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+ // flush pending msg
+ err = producer1.Flush()
+ assert.NoError(t, err)
+ err = producer2.Flush()
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
+
+ _, err = producer1.Send(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ })
+ assert.NoError(t, err)
+ _, err = producer2.Send(context.Background(), &ProducerMessage{
+ Value: value,
+ Schema: schema,
+ })
+ assert.NoError(t, err)
+}
+
+func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
+
+ c, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MemoryLimitBytes: 5 * 1024,
+ })
+ assert.NoError(t, err)
+ defer c.Close()
+
+ topicName := newTopicName()
+ producer1, _ := c.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBlockIfQueueFull: true,
+ DisableBatching: true,
+ EnableChunking: true,
+ SendTimeout: 2 * time.Second,
+ })
+
+ producer2, _ := c.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBlockIfQueueFull: true,
+ DisableBatching: false,
+ BatchingMaxPublishDelay: 100 * time.Millisecond,
+ SendTimeout: 2 * time.Second,
+ })
+
+ producer2.SendAsync(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 5*1024+1),
+ }, func(id MessageID, message *ProducerMessage, e error) {
+ if e != nil {
+ t.Fatal(e)
+ }
+ })
+
+ time.Sleep(50 * time.Millisecond)
+ assert.Equal(t, int64(5*1024+1), c.(*client).memLimit.CurrentUsage())
+
+ _, err = producer1.Send(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 1),
+ })
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
+
+ // wait all the mem have been released
+ retryAssert(t, 10, 200, func() {}, func(t assert.TestingT) bool {
+ return assert.Equal(t, 0,
int(c.(*client).memLimit.CurrentUsage()))
+ })
+
+ producer3, _ := c.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBlockIfQueueFull: true,
+ DisableBatching: true,
+ EnableChunking: true,
+ MaxPendingMessages: 1,
+ ChunkMaxMessageSize: 1024,
+ SendTimeout: 2 * time.Second,
+ })
+
+ // producer2 will reserve 2*1024 bytes and then release 1024 byte
(release the second chunk)
+ // because it reaches MaxPendingMessages in chunking
+ _, _ = producer3.Send(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 2*1024),
+ })
+ assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
+}
+
func TestMemLimitContextCancel(t *testing.T) {
c, err := NewClient(ClientOptions{