This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8e7f3  [issue #752] replace go-rate rate limiter with a buffered 
channel implementation (#799)
6a8e7f3 is described below

commit 6a8e7f39aac100a285a2c190186e38b73a5c9d34
Author: ming <[email protected]>
AuthorDate: Wed Jul 13 14:31:57 2022 -0400

    [issue #752] replace go-rate rate limiter with a buffered channel 
implementation (#799)
    
    * replace go-rate rate limiter with channel implementation
    
    * fix linter
    
    * update based on review comments
    
    * stop ratelimiter goroutine if the rate is unthrottled
---
 go.mod                |  1 -
 go.sum                |  2 --
 perf/perf-producer.go | 23 +++++++++++++----------
 3 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/go.mod b/go.mod
index ec6d810..5c2c33a 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,6 @@ require (
        github.com/AthenZ/athenz v1.10.39
        github.com/DataDog/zstd v1.5.0
        github.com/apache/pulsar-client-go/oauth2 
v0.0.0-20220120090717-25e59572242e
-       github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0
        github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
        github.com/davecgh/go-spew v1.1.1
        github.com/gogo/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 4eb4433..9f2ac9c 100644
--- a/go.sum
+++ b/go.sum
@@ -60,8 +60,6 @@ github.com/armon/circbuf 
v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod 
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod 
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
 github.com/aws/aws-sdk-go v1.32.6/go.mod 
h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
-github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 
h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM=
-github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0/go.mod 
h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 3ffa7c0..0ee0083 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -22,7 +22,6 @@ import (
        "encoding/json"
        "time"
 
-       "github.com/beefsack/go-rate"
        "github.com/bmizerany/perks/quantile"
        "github.com/spf13/cobra"
 
@@ -101,13 +100,18 @@ func produce(produceArgs *ProduceArgs, stop <-chan 
struct{}) {
        payload := make([]byte, produceArgs.MessageSize)
 
        ch := make(chan float64)
-
-       go func(stopCh <-chan struct{}) {
-               var rateLimiter *rate.RateLimiter
-               if produceArgs.Rate > 0 {
-                       rateLimiter = rate.New(produceArgs.Rate, time.Second)
+       rateLimitCh := make(chan time.Time, produceArgs.Rate)
+       go func(rateLimit int, interval time.Duration) {
+               if rateLimit <= 0 { // 0 as no limit enforced
+                       return
+               }
+               for {
+                       oldest := <-rateLimitCh
+                       time.Sleep(interval - time.Since(oldest))
                }
+       }(produceArgs.Rate, time.Second)
 
+       go func(stopCh <-chan struct{}) {
                for {
                        select {
                        case <-stopCh:
@@ -115,11 +119,10 @@ func produce(produceArgs *ProduceArgs, stop <-chan 
struct{}) {
                        default:
                        }
 
-                       if rateLimiter != nil {
-                               rateLimiter.Wait()
-                       }
-
                        start := time.Now()
+                       if produceArgs.Rate > 0 {
+                               rateLimitCh <- start
+                       }
 
                        producer.SendAsync(ctx, &pulsar.ProducerMessage{
                                Payload: payload,

Reply via email to