This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 13843e9  [perf] Support batch index ACK and max number of messages in 
batch (#967)
13843e9 is described below

commit 13843e99201141257904a5157600079752665774
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 1 10:52:43 2023 +0800

    [perf] Support batch index ACK and max number of messages in batch (#967)
    
    ### Motivation
    
    Currently, the perf tool does not support the maximum number of messages
    in batch for producer and batch index ACK for consumer.
    
    ### Modifications
    
    - Add the `--batching-max-size` option to configure the maximum number
      of messages for producer
    - To keep the code style consistent, change the `--batching-max-size`
      config from `int` to `uint`
    - Add the `--enable-batch-index-ack` option to enable batch index ACK
      for consumer.
---
 perf/perf-consumer.go | 13 ++++++++-----
 perf/perf-producer.go | 20 ++++++++++++--------
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 7fb8aab..6b6e411 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -31,9 +31,10 @@ import (
 
 // ConsumeArgs define the parameters required by consume
 type ConsumeArgs struct {
-       Topic             string
-       SubscriptionName  string
-       ReceiverQueueSize int
+       Topic               string
+       SubscriptionName    string
+       ReceiverQueueSize   int
+       EnableBatchIndexAck bool
 }
 
 func newConsumerCommand() *cobra.Command {
@@ -55,6 +56,7 @@ func newConsumerCommand() *cobra.Command {
        flags := cmd.Flags()
        flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", 
"sub", "Subscription name")
        flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", 
"r", 1000, "Receiver queue size")
+       flags.BoolVar(&consumeArgs.EnableBatchIndexAck, 
"enable-batch-index-ack", false, "Whether to enable batch index ACK")
 
        return cmd
 }
@@ -74,8 +76,9 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
        defer client.Close()
 
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
-               Topic:            consumeArgs.Topic,
-               SubscriptionName: consumeArgs.SubscriptionName,
+               Topic:                          consumeArgs.Topic,
+               SubscriptionName:               consumeArgs.SubscriptionName,
+               EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
        })
 
        if err != nil {
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 0ee0083..9d062ae 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -32,12 +32,13 @@ import (
 
 // ProduceArgs define the parameters required by produce
 type ProduceArgs struct {
-       Topic              string
-       Rate               int
-       BatchingTimeMillis int
-       BatchingMaxSize    int
-       MessageSize        int
-       ProducerQueueSize  int
+       Topic               string
+       Rate                int
+       BatchingTimeMillis  int
+       BatchingMaxSize     uint
+       BatchingNumMessages uint
+       MessageSize         int
+       ProducerQueueSize   int
 }
 
 func newProducerCommand() *cobra.Command {
@@ -62,8 +63,10 @@ func newProducerCommand() *cobra.Command {
                "Publish rate. Set to 0 to go unthrottled")
        flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
                "Batching grouping time in millis")
-       flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 
128,
+       flags.UintVar(&produceArgs.BatchingMaxSize, "batching-max-size", 128,
                "Max size of a batch (in KB)")
+       flags.UintVar(&produceArgs.BatchingNumMessages, 
"batching-num-messages", 1000,
+               "Maximum number of messages permitted in a batch")
        flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
                "Message size")
        flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
@@ -88,7 +91,8 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
                Topic:                   produceArgs.Topic,
                MaxPendingMessages:      produceArgs.ProducerQueueSize,
                BatchingMaxPublishDelay: time.Millisecond * 
time.Duration(produceArgs.BatchingTimeMillis),
-               BatchingMaxSize:         uint(produceArgs.BatchingMaxSize * 
1024),
+               BatchingMaxSize:         produceArgs.BatchingMaxSize * 1024,
+               BatchingMaxMessages:     produceArgs.BatchingNumMessages,
        })
        if err != nil {
                log.Fatal(err)

Reply via email to