This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c3922b8 [issue 513] fix batch size limit validation (#528)
c3922b8 is described below
commit c3922b82cd0e58b69670a28a96abdf88f61e0f91
Author: Ming <[email protected]>
AuthorDate: Mon Oct 25 07:56:54 2021 -0400
[issue 513] fix batch size limit validation (#528)
* fix batch size limit validation
* respect either of one batch size and number of messages limit
* correct hasSpace logic
* fix key-based batch builder
---
pulsar/internal/batch_builder.go | 4 ++--
pulsar/internal/key_based_batch_builder.go | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 92d6249..d08af53 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -157,7 +157,7 @@ func (bc *batchContainer) IsFull() bool {
func (bc *batchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
- return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) >
uint32(bc.maxBatchSize)
+ return bc.numMessages+1 < bc.maxMessages ||
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
}
// Add will add single message to batch.
@@ -174,7 +174,7 @@ func (bc *batchContainer) Add(
// There's already a message with cluster replication list.
need to flush before next
// message can be sent
return false
- } else if bc.hasSpace(payload) {
+ } else if !bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}
diff --git a/pulsar/internal/key_based_batch_builder.go
b/pulsar/internal/key_based_batch_builder.go
index 940aa9f..24d564b 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -117,7 +117,7 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
- return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) >
uint32(bc.maxBatchSize)
+ return bc.numMessages+1 < bc.maxMessages ||
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
}
// Add will add single message to key-based batch with message key.
@@ -134,7 +134,7 @@ func (bc *keyBasedBatchContainer) Add(
// There's already a message with cluster replication list.
need to flush before next
// message can be sent
return false
- } else if bc.hasSpace(payload) {
+ } else if !bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}