This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 62203d7  Expose BatchingMaxSize from ProducerOptions (#280)
62203d7 is described below

commit 62203d7bf6e0ac82ffff0d4fed66520094b66406
Author: dferstay <[email protected]>
AuthorDate: Sat Jun 13 08:40:15 2020 -0700

    Expose BatchingMaxSize from ProducerOptions (#280)
    
    Previously, the producer maximum batch size was hard-coded to 128 KB.
    
    Now, the produdcer maximum batch size is exposed via ProducerOptions
    and defaults to 128 KB
    
    Co-authored-by: Daniel Ferstay <[email protected]>
---
 pulsar/internal/batch_builder.go | 22 +++++++++++++++-------
 pulsar/producer.go               |  7 ++++++-
 pulsar/producer_partition.go     |  4 ++--
 pulsar/producer_test.go          |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 3b54eba..35cdf71 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -29,10 +29,9 @@ import (
 )
 
 const (
-       // MaxBatchSize will be the largest size for a batch sent from this 
particular producer.
-       // This is used as a baseline to allocate a new buffer that can hold 
the entire batch
-       // without needing costly re-allocations.
-       MaxBatchSize = 128 * 1024
+       // DefaultMaxBatchSize init default for maximum number of bytes per 
batch
+       DefaultMaxBatchSize = 128 * 1024
+
        // DefaultMaxMessagesPerBatch init default num of entries in per batch.
        DefaultMaxMessagesPerBatch = 1000
 )
@@ -47,6 +46,11 @@ type BatchBuilder struct {
        // Max number of message allowed in the batch
        maxMessages uint
 
+       // The largest size for a batch sent from this praticular producer.
+       // This is used as a baseline to allocate a new buffer that can hold 
the entire batch
+       // without needing costly re-allocations.
+       maxBatchSize uint
+
        producerName string
        producerID   uint64
 
@@ -58,15 +62,19 @@ type BatchBuilder struct {
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a 
new batch message container.
-func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
+func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, 
producerID uint64,
        compressionType pb.CompressionType) (*BatchBuilder, error) {
        if maxMessages == 0 {
                maxMessages = DefaultMaxMessagesPerBatch
        }
+       if maxBatchSize == 0 {
+               maxBatchSize = DefaultMaxBatchSize
+       }
        bb := &BatchBuilder{
                buffer:       NewBuffer(4096),
                numMessages:  0,
                maxMessages:  maxMessages,
+               maxBatchSize: maxBatchSize,
                producerName: producerName,
                producerID:   producerID,
                cmdSend: baseCommand(pb.BaseCommand_SEND,
@@ -93,12 +101,12 @@ func NewBatchBuilder(maxMessages uint, producerName 
string, producerID uint64,
 
 // IsFull check if the size in the current batch exceeds the maximum size 
allowed by the batch
 func (bb *BatchBuilder) IsFull() bool {
-       return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > 
MaxBatchSize
+       return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > 
uint32(bb.maxBatchSize)
 }
 
 func (bb *BatchBuilder) hasSpace(payload []byte) bool {
        msgSize := uint32(len(payload))
-       return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > 
MaxBatchSize
+       return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > 
uint32(bb.maxBatchSize)
 }
 
 // Add will add single message to batch.
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a951426..2d630f1 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -109,8 +109,13 @@ type ProducerOptions struct {
 
        // BatchingMaxMessages set the maximum number of messages permitted in 
a batch. (default: 1000)
        // If set to a value greater than 1, messages will be queued until this 
threshold is reached or
-       // batch interval has elapsed.
+       // BatchingMaxSize (see below) has been reached or the batch interval 
has elapsed.
        BatchingMaxMessages uint
+
+       // BatchingMaxSize sets the maximum number of bytes permitted in a 
batch. (default 128 KB)
+       // If set to a value greater than 1, messages will be queued until this 
threshold is reached or
+       // BatchingMaxMessages (see above) has been reached or the batch 
interval has elapsed.
+       BatchingMaxSize uint
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2eb3696..0f3d198 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -151,8 +151,8 @@ func (p *partitionProducer) grabCnx() error {
 
        p.producerName = res.Response.ProducerSuccess.GetProducerName()
        if p.batchBuilder == nil {
-               p.batchBuilder, err = 
internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
-                       p.producerID, 
pb.CompressionType(p.options.CompressionType))
+               p.batchBuilder, err = 
internal.NewBatchBuilder(p.options.BatchingMaxMessages, 
p.options.BatchingMaxSize,
+                       p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType))
                if err != nil {
                        return err
                }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9f76873..2c028c2 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -661,7 +661,7 @@ func TestBatchMessageFlushing(t *testing.T) {
        }
        defer producer.Close()
 
-       maxBytes := internal.MaxBatchSize
+       maxBytes := internal.DefaultMaxBatchSize
        genbytes := func(n int) []byte {
                c := []byte("a")[0]
                bytes := make([]byte, n)

Reply via email to