pgier commented on code in PR #799:
URL: https://github.com/apache/pulsar-client-go/pull/799#discussion_r912050134
##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan
struct{}) {
payload := make([]byte, produceArgs.MessageSize)
ch := make(chan float64)
-
- go func(stopCh <-chan struct{}) {
- var rateLimiter *rate.RateLimiter
- if produceArgs.Rate > 0 {
- rateLimiter = rate.New(produceArgs.Rate, time.Second)
+ rateLimitCh := make(chan time.Time, produceArgs.Rate)
Review Comment:
Since the buffer size here is proportional to the rate, should there be a
limit on the max size? Is there a way to do this with a fixed buffer size?
##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan
struct{}) {
payload := make([]byte, produceArgs.MessageSize)
ch := make(chan float64)
-
- go func(stopCh <-chan struct{}) {
- var rateLimiter *rate.RateLimiter
- if produceArgs.Rate > 0 {
- rateLimiter = rate.New(produceArgs.Rate, time.Second)
+ rateLimitCh := make(chan time.Time, produceArgs.Rate)
+ go func(rateLimit int, interval time.Duration) {
+ for {
+ last := <-rateLimitCh
+ if rateLimit > 0 { // 0 is defined as no limit enforced
+ time.Sleep(interval - time.Since(last))
+ }
}
+ }(produceArgs.Rate, time.Second)
+ go func(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
default:
}
- if rateLimiter != nil {
- rateLimiter.Wait()
- }
-
start := time.Now()
+ rateLimitCh <- start
Review Comment:
I think this should be skipped completely if there is no limit set.
##########
perf/perf-producer.go:
##########
@@ -101,25 +100,26 @@ func produce(produceArgs *ProduceArgs, stop <-chan
struct{}) {
payload := make([]byte, produceArgs.MessageSize)
ch := make(chan float64)
-
- go func(stopCh <-chan struct{}) {
- var rateLimiter *rate.RateLimiter
- if produceArgs.Rate > 0 {
- rateLimiter = rate.New(produceArgs.Rate, time.Second)
+ rateLimitCh := make(chan time.Time, produceArgs.Rate)
+ go func(rateLimit int, interval time.Duration) {
+ for {
+ last := <-rateLimitCh
Review Comment:
`last` makes me think this is coming from the back of the queue, maybe
change `last` to `oldest`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]