This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 13843e9 [perf] Support batch index ACK and max number of messages in
batch (#967)
13843e9 is described below
commit 13843e99201141257904a5157600079752665774
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 1 10:52:43 2023 +0800
[perf] Support batch index ACK and max number of messages in batch (#967)
### Motivation
Currently, the perf tool does not support the maximum number of messages
in batch for producer and batch index ACK for consumer.
### Modifications
- Add the `--batching-max-size` option to configure the maximum number
of messages for producer
- To keep the code style consistent, change the `--batching-max-size`
config from `int` to `uint`
- Add the `--enable-batch-index-ack` option to enable batch index ACK
for consumer.
---
perf/perf-consumer.go | 13 ++++++++-----
perf/perf-producer.go | 20 ++++++++++++--------
2 files changed, 20 insertions(+), 13 deletions(-)
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 7fb8aab..6b6e411 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -31,9 +31,10 @@ import (
// ConsumeArgs define the parameters required by consume
type ConsumeArgs struct {
- Topic string
- SubscriptionName string
- ReceiverQueueSize int
+ Topic string
+ SubscriptionName string
+ ReceiverQueueSize int
+ EnableBatchIndexAck bool
}
func newConsumerCommand() *cobra.Command {
@@ -55,6 +56,7 @@ func newConsumerCommand() *cobra.Command {
flags := cmd.Flags()
flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s",
"sub", "Subscription name")
flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size",
"r", 1000, "Receiver queue size")
+ flags.BoolVar(&consumeArgs.EnableBatchIndexAck,
"enable-batch-index-ack", false, "Whether to enable batch index ACK")
return cmd
}
@@ -74,8 +76,9 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
- Topic: consumeArgs.Topic,
- SubscriptionName: consumeArgs.SubscriptionName,
+ Topic: consumeArgs.Topic,
+ SubscriptionName: consumeArgs.SubscriptionName,
+ EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
})
if err != nil {
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 0ee0083..9d062ae 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -32,12 +32,13 @@ import (
// ProduceArgs define the parameters required by produce
type ProduceArgs struct {
- Topic string
- Rate int
- BatchingTimeMillis int
- BatchingMaxSize int
- MessageSize int
- ProducerQueueSize int
+ Topic string
+ Rate int
+ BatchingTimeMillis int
+ BatchingMaxSize uint
+ BatchingNumMessages uint
+ MessageSize int
+ ProducerQueueSize int
}
func newProducerCommand() *cobra.Command {
@@ -62,8 +63,10 @@ func newProducerCommand() *cobra.Command {
"Publish rate. Set to 0 to go unthrottled")
flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
"Batching grouping time in millis")
- flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "",
128,
+ flags.UintVar(&produceArgs.BatchingMaxSize, "batching-max-size", 128,
"Max size of a batch (in KB)")
+ flags.UintVar(&produceArgs.BatchingNumMessages,
"batching-num-messages", 1000,
+ "Maximum number of messages permitted in a batch")
flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
"Message size")
flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
@@ -88,7 +91,8 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
Topic: produceArgs.Topic,
MaxPendingMessages: produceArgs.ProducerQueueSize,
BatchingMaxPublishDelay: time.Millisecond *
time.Duration(produceArgs.BatchingTimeMillis),
- BatchingMaxSize: uint(produceArgs.BatchingMaxSize *
1024),
+ BatchingMaxSize: produceArgs.BatchingMaxSize * 1024,
+ BatchingMaxMessages: produceArgs.BatchingNumMessages,
})
if err != nil {
log.Fatal(err)