This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a013ff0 [Issue 833] Fix the availablePermits leak that could cause
consumer stuck. (#835)
a013ff0 is described below
commit a013ff0b7353fab87a7eb7599377bb06b46eb7b7
Author: Jiaqi Shen <[email protected]>
AuthorDate: Thu Oct 13 16:06:01 2022 +0800
[Issue 833] Fix the availablePermits leak that could cause consumer stuck.
(#835)
* fix: fix for issue833
* fix: fix for issue833 by modify dispatcher()
---
pulsar/consumer_partition.go | 55 +++++++++++++++++++++--------
pulsar/consumer_test.go | 82 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 122 insertions(+), 15 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 7ddff5e..5b61e7d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -82,6 +82,15 @@ const (
noMessageEntry = -1
)
+type permitsReq int32
+
+const (
+ // reset the availablePermits of pc
+ permitsReset permitsReq = iota
+ // increase the availablePermits
+ permitsInc
+)
+
type partitionConsumerOpts struct {
topic string
consumerName string
@@ -128,7 +137,8 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage
// the number of message slots available
- availablePermits int32
+ availablePermits int32
+ availablePermitsCh chan permitsReq
// the size of the queue channel for buffering messages
queueSize int32
@@ -224,6 +234,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
+ availablePermitsCh: make(chan permitsReq, 10),
}
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
@@ -932,7 +943,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
// reset available permits
- pc.availablePermits = 0
+ pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)
pc.log.Debugf("dispatcher requesting initial
permits=%d", initialPermits)
@@ -955,19 +966,14 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]
- // TODO implement a better flow controller
- // send more permits if needed
- pc.availablePermits++
- flowThreshold :=
int32(math.Max(float64(pc.queueSize/2), 1))
- if pc.availablePermits >= flowThreshold {
- availablePermits := pc.availablePermits
- requestedPermits := availablePermits
- pc.availablePermits = 0
+ pc.availablePermitsCh <- permitsInc
- pc.log.Debugf("requesting more permits=%d
available=%d", requestedPermits, availablePermits)
- if err :=
pc.internalFlow(uint32(requestedPermits)); err != nil {
- pc.log.WithError(err).Error("unable to
send permits")
- }
+ case pr := <-pc.availablePermitsCh:
+ switch pr {
+ case permitsInc:
+ pc.increasePermitsAndRequestMoreIfNeed()
+ case permitsReset:
+ pc.availablePermits = 0
}
case clearQueueCb := <-pc.clearQueueCh:
@@ -998,7 +1004,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
// reset available permits
- pc.availablePermits = 0
+ pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)
pc.log.Debugf("dispatcher requesting initial
permits=%d", initialPermits)
@@ -1438,6 +1444,25 @@ func (pc *partitionConsumer)
discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
+ pc.availablePermitsCh <- permitsInc
+}
+
+func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
+ // TODO implement a better flow controller
+ // send more permits if needed
+ flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
+ pc.availablePermits++
+ ap := pc.availablePermits
+ if ap >= flowThreshold {
+ availablePermits := ap
+ requestedPermits := ap
+ pc.availablePermitsCh <- permitsReset
+
+ pc.log.Debugf("requesting more permits=%d available=%d",
requestedPermits, availablePermits)
+ if err := pc.internalFlow(uint32(requestedPermits)); err != nil
{
+ pc.log.WithError(err).Error("unable to send permits")
+ }
+ }
}
// _setConn sets the internal connection field of this partition consumer
atomically.
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a180586..f574378 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"io/ioutil"
"log"
@@ -3180,3 +3181,84 @@ func TestConsumerSeekByTimeOnPartitionedTopic(t
*testing.T) {
consumer.Ack(msg)
}
}
+
+func TestAvailablePermitsLeak(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ client.Close()
+
+ topic := fmt.Sprintf("my-topic-test-ap-leak-%v",
time.Now().Nanosecond())
+
+ // 1. Producer with valid key name
+ p1, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ Schema: NewStringSchema(nil),
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, p1)
+
+ subscriptionName := "enc-failure-subcription"
+ totalMessages := 1000
+
+ // 2. KeyReader is not set by the consumer
+ // Receive should fail since KeyReader is not setup
+ // because default behaviour of consumer is fail receiving message if
error in decryption
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ })
+ assert.Nil(t, err)
+
+ messageFormat := "my-message-%v"
+ for i := 0; i < totalMessages; i++ {
+ _, err := p1.Send(context.Background(), &ProducerMessage{
+ Value: fmt.Sprintf(messageFormat, i),
+ })
+ assert.Nil(t, err)
+ }
+
+ // 2. Set another producer that send message without crypto.
+ // The consumer can receive it correct.
+ p2, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: NewStringSchema(nil),
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, p2)
+
+ _, err = p2.Send(context.Background(), &ProducerMessage{
+ Value: fmt.Sprintf(messageFormat, totalMessages),
+ })
+ assert.Nil(t, err)
+
+ // 3. Discard action on decryption failure. Create a availablePermits
leak scenario
+ consumer.Close()
+
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ Decryption: &MessageDecryptionInfo{
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionDiscard,
+ },
+ Schema: NewStringSchema(nil),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, consumer)
+
+ // 4. If availablePermits does not leak, consumer can get the last
message which is no crypto.
+ // The ctx3 will not exceed deadline.
+ ctx3, cancel3 := context.WithTimeout(context.Background(),
15*time.Second)
+ _, err = consumer.Receive(ctx3)
+ cancel3()
+ assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
+ "This means the resource is exhausted. consumer.Receive() will
block forever.")
+}