This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c558ddf  [issue 791] allow to add at least one message on batch 
builder (#792)
c558ddf is described below

commit c558ddfa10e04b88d33bc6daacc9aa067d9f28dc
Author: ming <[email protected]>
AuthorDate: Wed Jun 22 20:51:14 2022 -0400

    [issue 791] allow to add at least one message on batch builder (#792)
    
    * allow to add at least one message on batch builder
    
    * add unit test for batching disabled
---
 pulsar/internal/batch_builder.go           |  4 ++++
 pulsar/internal/key_based_batch_builder.go |  5 ++++
 pulsar/producer_test.go                    | 38 ++++++++++++++++++++++++------
 3 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 9d18f26..fb7598e 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -157,6 +157,10 @@ func (bc *batchContainer) IsFull() bool {
 
 // hasSpace should return true if and only if the batch container can 
accommodate another message of length payload.
 func (bc *batchContainer) hasSpace(payload []byte) bool {
+       if bc.numMessages == 0 {
+               // allow to add at least one message when batching is disabled
+               return true
+       }
        msgSize := uint32(len(payload))
        return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index d09138c..667e855 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -117,6 +117,11 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
 
 // hasSpace should return true if and only if the batch container can 
accommodate another message of length payload.
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
+       if bc.numMessages == 0 {
+               // allow to add at least one message
+               // and a single max message size is checked in the producer 
partition, therefore no need to validate batch size
+               return true
+       }
        msgSize := uint32(len(payload))
        return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 541c1fe..6b2b5d9 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -877,20 +877,44 @@ func TestMaxBatchSize(t *testing.T) {
        assert.NotNil(t, producer)
        defer producer.Close()
 
-       for bias := -1; bias <= 1; bias++ {
+       for bias := -1; bias <= 3; bias++ {
                payload := make([]byte, batchMaxMessageSize+bias)
                ID, err := producer.Send(context.Background(), &ProducerMessage{
                        Payload: payload,
                })
-               if bias <= 0 {
-                       assert.NoError(t, err)
-                       assert.NotNil(t, ID)
-               } else {
-                       assert.Equal(t, errFailAddToBatch, err)
-               }
+               // regardless max batch size, if the batch size limit is 
reached, batching is triggered to send messages
+               assert.NoError(t, err)
+               assert.NotNil(t, ID)
        }
 }
 
+func TestBatchingDisabled(t *testing.T) {
+       defaultMaxMessageSize := 128 * 1024
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       // when batching is disabled, the batching size has no effect
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: true,
+               BatchingMaxSize: uint(defaultMaxBatchSize),
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       payload := make([]byte, defaultMaxMessageSize+100)
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: payload,
+       })
+       // regardless max batch size, if the batch size limit is reached, 
batching is triggered to send messages
+       assert.NoError(t, err)
+       assert.NotNil(t, ID)
+}
+
 func TestMaxMessageSize(t *testing.T) {
        serverMaxMessageSize := 1024 * 1024
 

Reply via email to