Copilot commented on code in PR #1457:
URL: https://github.com/apache/pulsar-client-go/pull/1457#discussion_r2745067307
##########
pulsar/producer_partition.go:
##########
@@ -523,9 +523,10 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
}
if strings.Contains(errMsg,
errMsgProducerBlockedQuotaExceededException) {
+ // ProducerBlockedQuotaExceededException is a
retryable exception,
+ // we only fail pending messages but continue
trying to reconnect
p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, stop reconnecting")
Review Comment:
The log message states "stop reconnecting" but the code change actually
allows reconnection to continue (which is the intent of this PR). The log
message should be updated to accurately reflect that reconnection will
continue, for example: "Producer was blocked by quota exceed exception, failing
pending messages, will retry reconnecting"
##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
time.Sleep(time.Second * 1)
}
}
+
+type testReconnectBackoffPolicy struct {
+ curBackoff, minBackoff, maxBackoff time.Duration
+ retryTime int
+ lock sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration)
*testReconnectBackoffPolicy {
+ return &testReconnectBackoffPolicy{
+ curBackoff: 0,
+ minBackoff: minBackoff,
+ maxBackoff: maxBackoff,
+ }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ // Double the delay each time
+ b.curBackoff += b.curBackoff
+ if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+ b.curBackoff = b.minBackoff
+ } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+ b.curBackoff = b.maxBackoff
+ }
+ b.retryTime++
+ return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+ return true
Review Comment:
The IsExpectedIntervalFrom method signature is incorrect. It should accept a
time.Time parameter like the testBackoffPolicy in reader_test.go (line 862).
The current implementation returns a constant true value and doesn't validate
timing at all, making it useless for testing.
```suggestion
func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom(start time.Time)
bool {
// Verify that the elapsed time since 'start' is close to the current
backoff interval.
b.lock.Lock()
expected := b.curBackoff
b.lock.Unlock()
// If no backoff is expected yet, do not enforce any interval.
if expected <= 0 {
return true
}
elapsed := time.Since(start)
// Allow some tolerance around the expected backoff interval (50%–150%).
lowerBound := expected / 2
upperBound := expected + expected/2
return elapsed >= lowerBound && elapsed <= upperBound
```
##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
time.Sleep(time.Second * 1)
}
}
+
+type testReconnectBackoffPolicy struct {
+ curBackoff, minBackoff, maxBackoff time.Duration
+ retryTime int
+ lock sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration)
*testReconnectBackoffPolicy {
+ return &testReconnectBackoffPolicy{
+ curBackoff: 0,
+ minBackoff: minBackoff,
+ maxBackoff: maxBackoff,
+ }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ // Double the delay each time
+ b.curBackoff += b.curBackoff
+ if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+ b.curBackoff = b.minBackoff
+ } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+ b.curBackoff = b.maxBackoff
+ }
+ b.retryTime++
+ return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
Review Comment:
The Reset method is empty but should reset curBackoff to 0 to match the
interface contract and the behavior of DefaultBackoff. While this may not
affect the test since Reset isn't called in the test scenario, it makes the
implementation incomplete and could lead to unexpected behavior if the backoff
policy is reused.
```suggestion
func (b *testReconnectBackoffPolicy) Reset() {
b.lock.Lock()
defer b.lock.Unlock()
b.curBackoff = 0
b.retryTime = 0
```
##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
time.Sleep(time.Second * 1)
}
}
+
+type testReconnectBackoffPolicy struct {
+ curBackoff, minBackoff, maxBackoff time.Duration
+ retryTime int
+ lock sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration)
*testReconnectBackoffPolicy {
+ return &testReconnectBackoffPolicy{
+ curBackoff: 0,
+ minBackoff: minBackoff,
+ maxBackoff: maxBackoff,
+ }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ // Double the delay each time
+ b.curBackoff += b.curBackoff
+ if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+ b.curBackoff = b.minBackoff
+ } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+ b.curBackoff = b.maxBackoff
+ }
+ b.retryTime++
+ return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+ return true
+}
+
+func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
+ logger := slog.New(slog.NewJSONHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelInfo}))
+ slog.SetDefault(logger)
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ Logger: plog.NewLoggerWithSlog(logger),
+ })
+ defer client.Close()
+ namespace := "public/" + generateRandomName()
+ assert.NoError(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{
+ WebServiceURL: adminURL,
+ })
+ assert.NoError(t, err)
+ // Step 1: Create namespace and configure 512KB backlog quota with
producer_exception policy
+ // When subscription backlog stats refresh and reach the limit,
producer will encounter BlockQuotaExceed exception
+ err = admin.Namespaces().CreateNamespace(namespace)
+ assert.NoError(t, err)
+ err = admin.Namespaces().SetBacklogQuota(
+ namespace,
+ utils.NewBacklogQuota(512*1024, -1, utils.ProducerException),
+ utils.DestinationStorage,
+ )
+ assert.NoError(t, err)
+
+ // Verify backlog quota configuration
+ quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
+ assert.NoError(t, err)
+ logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
+
+ // Create test topic
+ topicName := namespace + "/test-topic"
+ tn, err := utils.GetTopicName(topicName)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*tn, 1)
+ assert.NoError(t, err)
+
+ // Step 2: Create consumer with small receiver queue size and earliest
subscription position
+ // This ensures that once 512KB message is sent, producer will quickly
reach backlog quota limit
+ _consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 1,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer _consumer.Close()
+
+ // Step 3: Create producer with custom backoff policy to reduce retry
interval
+ bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: true,
+ SendTimeout: 5 * time.Minute,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return bo
+ },
+ })
+ assert.NoError(t, err)
+ defer _producer.Close()
+
+ // Step 4: Start goroutine to continuously send 512KB messages and
monitor statistics
+ go func() {
+ for {
+ // Producer sends 512KB message every 10 seconds
+ _producer.SendAsync(context.Background(),
&ProducerMessage{
+ Payload: make([]byte, 512*1024),
+ }, func(msgId MessageID, _ *ProducerMessage, err error)
{
+ if err != nil {
+ logger.Error("sendAsync fail", "time",
time.Now().String(), "err", err.Error())
+ return
+ }
+ logger.Info("sendAsync success", "msgId",
msgId.String(), "time", time.Now().String())
+ })
+
+ // Get topic statistics for debugging
+ stats, err := admin.Topics().GetPartitionedStats(*tn,
false)
+ assert.NoError(t, err)
Review Comment:
The goroutine calls assert.NoError which doesn't properly fail the test when
called from a goroutine. The testing.T methods are not safe to call from
goroutines other than the one running the test. Use require.NoError with proper
error handling or send errors through a channel to be checked in the main test
goroutine.
##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
time.Sleep(time.Second * 1)
}
}
+
+type testReconnectBackoffPolicy struct {
+ curBackoff, minBackoff, maxBackoff time.Duration
+ retryTime int
+ lock sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration)
*testReconnectBackoffPolicy {
+ return &testReconnectBackoffPolicy{
+ curBackoff: 0,
+ minBackoff: minBackoff,
+ maxBackoff: maxBackoff,
+ }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ // Double the delay each time
+ b.curBackoff += b.curBackoff
+ if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
+ b.curBackoff = b.minBackoff
+ } else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
+ b.curBackoff = b.maxBackoff
+ }
+ b.retryTime++
+ return b.curBackoff
+}
+func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ return b.curBackoff >= b.maxBackoff
+}
+
+func (b *testReconnectBackoffPolicy) Reset() {
+}
+
+func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
+ return true
+}
+
+func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
+ logger := slog.New(slog.NewJSONHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelInfo}))
+ slog.SetDefault(logger)
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ Logger: plog.NewLoggerWithSlog(logger),
+ })
+ defer client.Close()
+ namespace := "public/" + generateRandomName()
+ assert.NoError(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{
+ WebServiceURL: adminURL,
+ })
+ assert.NoError(t, err)
+ // Step 1: Create namespace and configure 512KB backlog quota with
producer_exception policy
+ // When subscription backlog stats refresh and reach the limit,
producer will encounter BlockQuotaExceed exception
+ err = admin.Namespaces().CreateNamespace(namespace)
+ assert.NoError(t, err)
+ err = admin.Namespaces().SetBacklogQuota(
+ namespace,
+ utils.NewBacklogQuota(512*1024, -1, utils.ProducerException),
+ utils.DestinationStorage,
+ )
+ assert.NoError(t, err)
+
+ // Verify backlog quota configuration
+ quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
+ assert.NoError(t, err)
+ logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
+
+ // Create test topic
+ topicName := namespace + "/test-topic"
+ tn, err := utils.GetTopicName(topicName)
+ assert.NoError(t, err)
+ err = admin.Topics().Create(*tn, 1)
+ assert.NoError(t, err)
+
+ // Step 2: Create consumer with small receiver queue size and earliest
subscription position
+ // This ensures that once 512KB message is sent, producer will quickly
reach backlog quota limit
+ _consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 1,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer _consumer.Close()
+
+ // Step 3: Create producer with custom backoff policy to reduce retry
interval
+ bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ _producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: true,
+ SendTimeout: 5 * time.Minute,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return bo
+ },
+ })
+ assert.NoError(t, err)
+ defer _producer.Close()
+
+ // Step 4: Start goroutine to continuously send 512KB messages and
monitor statistics
+ go func() {
+ for {
+ // Producer sends 512KB message every 10 seconds
+ _producer.SendAsync(context.Background(),
&ProducerMessage{
+ Payload: make([]byte, 512*1024),
+ }, func(msgId MessageID, _ *ProducerMessage, err error)
{
+ if err != nil {
+ logger.Error("sendAsync fail", "time",
time.Now().String(), "err", err.Error())
+ return
+ }
+ logger.Info("sendAsync success", "msgId",
msgId.String(), "time", time.Now().String())
+ })
+
+ // Get topic statistics for debugging
+ stats, err := admin.Topics().GetPartitionedStats(*tn,
false)
+ assert.NoError(t, err)
+ logger.Info("current stats", "stats", stats)
+ time.Sleep(10 * time.Second)
+ }
+ }()
Review Comment:
The goroutine started at line 3061 runs an infinite loop and is never
stopped. This creates a goroutine leak that will continue running even after
the test completes. The goroutine should be controlled with a context, channel,
or some other mechanism to ensure it stops when the test ends. Additionally,
the infinite loop can continue sending messages after the test assertion
passes, potentially interfering with other tests.
##########
pulsar/producer_test.go:
##########
@@ -2952,3 +2953,135 @@ func TestPartitionUpdateFailed(t *testing.T) {
time.Sleep(time.Second * 1)
}
}
+
+type testReconnectBackoffPolicy struct {
+ curBackoff, minBackoff, maxBackoff time.Duration
+ retryTime int
+ lock sync.Mutex
+}
+
+func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration)
*testReconnectBackoffPolicy {
+ return &testReconnectBackoffPolicy{
+ curBackoff: 0,
+ minBackoff: minBackoff,
+ maxBackoff: maxBackoff,
+ }
+}
+
+func (b *testReconnectBackoffPolicy) Next() time.Duration {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ // Double the delay each time
+ b.curBackoff += b.curBackoff
Review Comment:
The backoff calculation logic is incorrect. When curBackoff is 0, adding it
to itself (curBackoff += curBackoff) keeps it at 0, so it never increases. This
means the backoff will always stay at minBackoff and never reach maxBackoff.
The correct implementation should use multiplication (e.g., curBackoff *= 2) or
match the DefaultBackoff implementation from backoff.go which uses curBackoff
+= curBackoff but initializes to a non-zero value or handles the zero case
differently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]