This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 576e61b8 golang: Added rate limiting for SimpleConsumer's Receive 
requests by clientID dimension (#1141)
576e61b8 is described below

commit 576e61b8624b2dc09f1112649ff12dd44ff0d6d2
Author: guyinyou <[email protected]>
AuthorDate: Wed Dec 17 10:25:45 2025 +0800

    golang: Added rate limiting for SimpleConsumer's Receive requests by 
clientID dimension (#1141)
    
    * golang: Added rate limiting for SimpleConsumer's Receive requests by 
clientID dimension
    
    Change-Id: I0968728e0fa1532c264b786c5f7e5f4234dcc017
    
    * default maxReceiveConcurrency set 20
    
    Change-Id: I27a15654b51cede7453ea99d058a613cd4729600
    
    ---------
    
    Co-authored-by: guyinyou <[email protected]>
---
 golang/receive_rate_limiter.go      |  81 +++++++++++
 golang/receive_rate_limiter_test.go | 278 ++++++++++++++++++++++++++++++++++++
 golang/simple_consumer.go           |   9 ++
 golang/simple_consumer_options.go   |  12 +-
 4 files changed, 379 insertions(+), 1 deletion(-)

diff --git a/golang/receive_rate_limiter.go b/golang/receive_rate_limiter.go
new file mode 100644
index 00000000..a7cb1c37
--- /dev/null
+++ b/golang/receive_rate_limiter.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+       "context"
+       "sync"
+)
+
+type receiveRateLimiter struct {
+       mu             sync.Mutex
+       cond           *sync.Cond
+       maxConcurrency int
+       currentCount   int
+}
+
+func newReceiveRateLimiter(maxConcurrency int) *receiveRateLimiter {
+       if maxConcurrency <= 0 {
+               maxConcurrency = 10 // default 10 concurrent requests
+       }
+       rl := &receiveRateLimiter{
+               maxConcurrency: maxConcurrency,
+       }
+       rl.cond = sync.NewCond(&rl.mu)
+       return rl
+}
+
+func (rl *receiveRateLimiter) acquire(ctx context.Context) error {
+       rl.mu.Lock()
+       defer rl.mu.Unlock()
+
+       for rl.currentCount >= rl.maxConcurrency {
+               if ctx.Err() != nil {
+                       return ctx.Err()
+               }
+
+               waitDone := make(chan struct{})
+               go func() {
+                       select {
+                       case <-ctx.Done():
+                               rl.cond.Broadcast()
+                       case <-waitDone:
+                       }
+               }()
+
+               rl.cond.Wait()
+               close(waitDone)
+
+               if ctx.Err() != nil {
+                       return ctx.Err()
+               }
+       }
+
+       rl.currentCount++
+       return nil
+}
+
+func (rl *receiveRateLimiter) release() {
+       rl.mu.Lock()
+       defer rl.mu.Unlock()
+
+       if rl.currentCount > 0 {
+               rl.currentCount--
+               rl.cond.Signal()
+       }
+}
diff --git a/golang/receive_rate_limiter_test.go 
b/golang/receive_rate_limiter_test.go
new file mode 100644
index 00000000..0fa8248b
--- /dev/null
+++ b/golang/receive_rate_limiter_test.go
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestReceiveRateLimiter_BasicLimit(t *testing.T) {
+       maxConcurrency := 3
+       limiter := newReceiveRateLimiter(maxConcurrency)
+
+       // Test basic limit: should be able to acquire maxConcurrency permits
+       ctx := context.Background()
+       for i := 0; i < maxConcurrency; i++ {
+               err := limiter.acquire(ctx)
+               assert.NoError(t, err, "should be able to acquire permit")
+       }
+
+       // Try to acquire the (maxConcurrency+1)th permit, should be blocked
+       acquired := int32(0)
+       go func() {
+               err := limiter.acquire(ctx)
+               if err == nil {
+                       atomic.StoreInt32(&acquired, 1)
+                       limiter.release()
+               }
+       }()
+
+       // Wait a short time to confirm it was not immediately acquired
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int32(0), atomic.LoadInt32(&acquired), "should not 
acquire permit immediately")
+
+       // Release one permit, should wake up the waiting goroutine
+       limiter.release()
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int32(1), atomic.LoadInt32(&acquired), "should acquire 
permit after release")
+}
+
+func TestReceiveRateLimiter_ConcurrentLimit(t *testing.T) {
+       maxConcurrency := 5
+       limiter := newReceiveRateLimiter(maxConcurrency)
+       ctx := context.Background()
+
+       // Start many goroutines to request permits concurrently
+       totalGoroutines := 20
+       acquiredCount := int32(0)
+       activeCount := int32(0)
+       maxActive := int32(0)
+       var wg sync.WaitGroup
+       var mu sync.Mutex
+
+       for i := 0; i < totalGoroutines; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       err := limiter.acquire(ctx)
+                       if err != nil {
+                               return
+                       }
+
+                       // Record current active count
+                       current := atomic.AddInt32(&activeCount, 1)
+                       mu.Lock()
+                       if current > maxActive {
+                               maxActive = current
+                       }
+                       mu.Unlock()
+
+                       // Simulate some work
+                       time.Sleep(50 * time.Millisecond)
+
+                       atomic.AddInt32(&acquiredCount, 1)
+                       atomic.AddInt32(&activeCount, -1)
+                       limiter.release()
+               }()
+       }
+
+       wg.Wait()
+
+       // Verify: all goroutines should successfully acquire permits
+       assert.Equal(t, int32(totalGoroutines), acquiredCount, "all goroutines 
should successfully acquire permits")
+
+       // Verify: concurrent active count should not exceed maxConcurrency
+       assert.LessOrEqual(t, maxActive, int32(maxConcurrency), "concurrent 
active count should not exceed limit")
+}
+
+func TestReceiveRateLimiter_ContextCancel(t *testing.T) {
+       maxConcurrency := 2
+       limiter := newReceiveRateLimiter(maxConcurrency)
+
+       // Acquire all permits first
+       ctx := context.Background()
+       for i := 0; i < maxConcurrency; i++ {
+               err := limiter.acquire(ctx)
+               assert.NoError(t, err)
+       }
+
+       // Create a context that will be cancelled
+       cancelCtx, cancel := context.WithCancel(context.Background())
+
+       // Try to acquire permit, should be blocked
+       acquired := int32(0)
+       errChan := make(chan error, 1)
+       go func() {
+               err := limiter.acquire(cancelCtx)
+               errChan <- err
+               if err == nil {
+                       atomic.StoreInt32(&acquired, 1)
+                       limiter.release()
+               }
+       }()
+
+       // Wait a short time to confirm it was blocked
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int32(0), atomic.LoadInt32(&acquired), "should be 
blocked")
+
+       // Cancel context
+       cancel()
+
+       // Wait for goroutine to return
+       select {
+       case err := <-errChan:
+               assert.Error(t, err, "should return context cancellation error")
+               assert.Equal(t, context.Canceled, err, "error should be 
context.Canceled")
+       case <-time.After(1 * time.Second):
+               t.Fatal("should return immediately after context cancellation")
+       }
+
+       // Release previous permits
+       limiter.release()
+       limiter.release()
+}
+
+func TestReceiveRateLimiter_ReleaseWithoutAcquire(t *testing.T) {
+       limiter := newReceiveRateLimiter(5)
+       ctx := context.Background()
+
+       // Release without acquiring, should not panic
+       assert.NotPanics(t, func() {
+               limiter.release()
+       })
+
+       // Should be able to acquire permit normally after release
+       err := limiter.acquire(ctx)
+       assert.NoError(t, err, "should be able to acquire permit normally")
+       limiter.release()
+}
+
+func TestReceiveRateLimiter_MultipleRelease(t *testing.T) {
+       maxConcurrency := 3
+       limiter := newReceiveRateLimiter(maxConcurrency)
+       ctx := context.Background()
+
+       // Acquire one permit
+       err := limiter.acquire(ctx)
+       assert.NoError(t, err)
+
+       // Release multiple times, should not panic
+       limiter.release()
+       assert.NotPanics(t, func() {
+               limiter.release()
+               limiter.release()
+       })
+
+       // Should be able to acquire maxConcurrency permits (because count was 
reset)
+       for i := 0; i < maxConcurrency; i++ {
+               err := limiter.acquire(ctx)
+               assert.NoError(t, err, "should be able to acquire permit")
+       }
+}
+
+func TestReceiveRateLimiter_ZeroMaxConcurrency(t *testing.T) {
+       // Test that when maxConcurrency is 0 or negative, default value 10 
should be used
+       limiter := newReceiveRateLimiter(0)
+       ctx := context.Background()
+
+       // Should be able to acquire 10 permits
+       for i := 0; i < 10; i++ {
+               err := limiter.acquire(ctx)
+               assert.NoError(t, err, "should be able to acquire permit")
+       }
+
+       // The 11th should be blocked
+       acquired := int32(0)
+       go func() {
+               err := limiter.acquire(ctx)
+               if err == nil {
+                       atomic.StoreInt32(&acquired, 1)
+                       limiter.release()
+               }
+       }()
+
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int32(0), atomic.LoadInt32(&acquired), "should be 
blocked")
+
+       // Cleanup
+       for i := 0; i < 10; i++ {
+               limiter.release()
+       }
+}
+
+func TestReceiveRateLimiter_StressTest(t *testing.T) {
+       maxConcurrency := 10
+       limiter := newReceiveRateLimiter(maxConcurrency)
+       ctx := context.Background()
+
+       // Stress test: large number of concurrent requests
+       totalRequests := 1000
+       successCount := int32(0)
+       var wg sync.WaitGroup
+       startTime := time.Now()
+
+       for i := 0; i < totalRequests; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       err := limiter.acquire(ctx)
+                       if err != nil {
+                               return
+                       }
+                       atomic.AddInt32(&successCount, 1)
+                       time.Sleep(1 * time.Millisecond) // Simulate work
+                       limiter.release()
+               }()
+       }
+
+       wg.Wait()
+       duration := time.Since(startTime)
+
+       // Verify all requests succeeded
+       assert.Equal(t, int32(totalRequests), successCount, "all requests 
should succeed")
+
+       // Verify rate limiting works: without rate limiting, 1000 requests 
should complete quickly
+       // With rate limiting, it should take longer
+       // This is just a simple verification, actual time depends on scheduling
+       t.Logf("Completed %d requests in %v", totalRequests, duration)
+       assert.Greater(t, duration, 50*time.Millisecond, "with rate limiting, 
it should take some time")
+}
+
+func TestReceiveRateLimiter_SequentialAcquireRelease(t *testing.T) {
+       limiter := newReceiveRateLimiter(3)
+       ctx := context.Background()
+
+       // Sequential acquire and release
+       for i := 0; i < 10; i++ {
+               err := limiter.acquire(ctx)
+               assert.NoError(t, err, "should be able to acquire permit")
+               limiter.release()
+       }
+
+       // Verify can acquire again
+       err := limiter.acquire(ctx)
+       assert.NoError(t, err, "should be able to acquire permit again")
+       limiter.release()
+}
+
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 2da42b13..4fa0b0fd 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -59,6 +59,7 @@ type defaultSimpleConsumer struct {
        subscriptionExpressionsLock  sync.RWMutex
        subscriptionExpressions      *map[string]*FilterExpression
        subTopicRouteDataResultCache sync.Map
+       receiveRateLimiter           *receiveRateLimiter
 }
 
 func (sc *defaultSimpleConsumer) SetRequestTimeout(timeout time.Duration) {
@@ -318,6 +319,13 @@ func (sc *defaultSimpleConsumer) Receive(ctx 
context.Context, maxMessageNum int3
        if err != nil {
                return nil, err
        }
+
+       // Apply rate limiting
+       if err = sc.receiveRateLimiter.acquire(ctx); err != nil {
+               return nil, fmt.Errorf("failed to acquire rate limit permit: 
%w", err)
+       }
+       defer sc.receiveRateLimiter.release()
+
        request := sc.wrapReceiveMessageRequest(int(maxMessageNum), 
selectMessageQueue, filterExpression, invisibleDuration)
        timeout := sc.scOpts.awaitDuration + sc.cli.opts.timeout
        return sc.receiveMessage(ctx, request, selectMessageQueue, timeout)
@@ -365,6 +373,7 @@ var NewSimpleConsumer = func(config *Config, opts 
...SimpleConsumerOption) (Simp
 
                awaitDuration:           scOpts.awaitDuration,
                subscriptionExpressions: &scOpts.subscriptionExpressions,
+               receiveRateLimiter:      
newReceiveRateLimiter(scOpts.maxReceiveConcurrency),
        }
 
        sc.cli.initTopics = make([]string, 0)
diff --git a/golang/simple_consumer_options.go 
b/golang/simple_consumer_options.go
index 95c8751a..42556d58 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -29,10 +29,12 @@ type simpleConsumerOptions struct {
        subscriptionExpressions map[string]*FilterExpression
        awaitDuration           time.Duration
        clientFunc              NewClientFunc
+       maxReceiveConcurrency   int
 }
 
 var defaultSimpleConsumerOptions = simpleConsumerOptions{
-       clientFunc: NewClient,
+       clientFunc:            NewClient,
+       maxReceiveConcurrency: 20, // default 20 concurrent Receive requests
 }
 
 // A ConsumerOption sets options such as tag, etc.
@@ -78,6 +80,14 @@ func WithSimpleAwaitDuration(awaitDuration time.Duration) 
SimpleConsumerOption {
        })
 }
 
+// WithSimpleReceiveRateLimit sets the maximum concurrency for Receive 
requests (by clientID dimension)
+// maxConcurrency: maximum number of concurrent Receive requests allowed, 
default is 100
+func WithSimpleReceiveRateLimit(maxConcurrency int) SimpleConsumerOption {
+       return newFuncSimpleConsumerOption(func(o *simpleConsumerOptions) {
+               o.maxReceiveConcurrency = maxConcurrency
+       })
+}
+
 var _ = ClientSettings(&simpleConsumerSettings{})
 
 type simpleConsumerSettings struct {

Reply via email to