This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c3922b8  [issue 513] fix batch size limit validation (#528)
c3922b8 is described below

commit c3922b82cd0e58b69670a28a96abdf88f61e0f91
Author: Ming <[email protected]>
AuthorDate: Mon Oct 25 07:56:54 2021 -0400

    [issue 513] fix batch size limit validation (#528)
    
    * fix batch size limit validation
    
    * respect either of one batch size and number of messages limit
    
    * correct hasSpace logic
    
    * fix key-based batch builder
---
 pulsar/internal/batch_builder.go           | 4 ++--
 pulsar/internal/key_based_batch_builder.go | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 92d6249..d08af53 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -157,7 +157,7 @@ func (bc *batchContainer) IsFull() bool {
 
 func (bc *batchContainer) hasSpace(payload []byte) bool {
        msgSize := uint32(len(payload))
-       return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > 
uint32(bc.maxBatchSize)
+       return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to batch.
@@ -174,7 +174,7 @@ func (bc *batchContainer) Add(
                // There's already a message with cluster replication list. 
need to flush before next
                // message can be sent
                return false
-       } else if bc.hasSpace(payload) {
+       } else if !bc.hasSpace(payload) {
                // The current batch is full. Producer has to call Flush() to
                return false
        }
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 940aa9f..24d564b 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -117,7 +117,7 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
 
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
        msgSize := uint32(len(payload))
-       return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > 
uint32(bc.maxBatchSize)
+       return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to key-based batch with message key.
@@ -134,7 +134,7 @@ func (bc *keyBasedBatchContainer) Add(
                // There's already a message with cluster replication list. 
need to flush before next
                // message can be sent
                return false
-       } else if bc.hasSpace(payload) {
+       } else if !bc.hasSpace(payload) {
                // The current batch is full. Producer has to call Flush() to
                return false
        }

Reply via email to