This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1fe9b62 [feat] support for consumer event listener (#904)
1fe9b62 is described below
commit 1fe9b624ca673e3a647780b7c6c4b2567089285a
Author: labuladong <[email protected]>
AuthorDate: Tue Dec 13 09:54:35 2022 +0800
[feat] support for consumer event listener (#904)
### Motivation
Support consumer active events.
### Modifications
Add an option to consumers, add callback on
`BaseCommand_ACTIVE_CONSUMER_CHANGE`.
---
pulsar/consumer.go | 3 +
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 20 ++++++
pulsar/consumer_test.go | 160 ++++++++++++++++++++++++++++++++++++++----
pulsar/internal/connection.go | 13 ++++
5 files changed, 185 insertions(+), 12 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index ffef786..0a515e0 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -120,6 +120,9 @@ type ConsumerOptions struct {
// Default is `Latest`
SubscriptionInitialPosition
+ // EventListener will be called when active consumer changed (in
failover subscription type)
+ EventListener ConsumerEventListener
+
// DLQ represents the configuration for Dead Letter Queue consumer
policy.
// eg. route the message to topic X after N failed attempts at
processing it
// By default is nil and there's no DLQ
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index a402a4d..707c13c 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -376,6 +376,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
maxPendingChunkedMessage:
c.options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk:
c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk:
c.options.AutoAckIncompleteChunk,
+ consumerEventListener:
c.options.EventListener,
}
cons, err := newPartitionConsumer(c, c.client, opts,
c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4149a91..46f13d8 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -112,6 +112,13 @@ type partitionConsumerOpts struct {
maxPendingChunkedMessage int
expireTimeOfIncompleteChunk time.Duration
autoAckIncompleteChunk bool
+ // in failover mode, this callback will be called when consumer change
+ consumerEventListener ConsumerEventListener
+}
+
+type ConsumerEventListener interface {
+ BecameActive(consumer Consumer, topicName string, partition int32)
+ BecameInactive(consumer Consumer, topicName string, partition int32)
}
type partitionConsumer struct {
@@ -161,6 +168,19 @@ type partitionConsumer struct {
unAckChunksTracker *unAckChunksTracker
}
+func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
+ listener := pc.options.consumerEventListener
+ if listener == nil {
+ // didn't set a listener
+ return
+ }
+ if isActive {
+ listener.BecameActive(pc.parentConsumer, pc.topic,
pc.partitionIdx)
+ } else {
+ listener.BecameInactive(pc.parentConsumer, pc.topic,
pc.partitionIdx)
+ }
+}
+
type availablePermits struct {
permits int32
pc *partitionConsumer
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 95462ba..58a524a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -25,6 +25,7 @@ import (
"log"
"net/http"
"strconv"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -321,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) {
assert.NotEqual(t, 0, receivedConsumer1)
assert.NotEqual(t, 0, receivedConsumer2)
- fmt.Printf("TestConsumerKeyShared received messages consumer1: %d
consumser2: %d\n",
+ t.Logf("TestConsumerKeyShared received messages consumer1: %d
consumser2: %d\n",
receivedConsumer1, receivedConsumer2)
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
}
@@ -375,7 +376,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
- fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ t.Logf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
@@ -384,6 +385,141 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Equal(t, len(msgs), 10)
}
+type TestActiveConsumerListener struct {
+ t *testing.T
+ lock sync.RWMutex
+ nameToPartitions map[string]map[int32]struct{}
+}
+
+func (l *TestActiveConsumerListener) getConsumerCount() int {
+ l.lock.RLock()
+ defer l.lock.RUnlock()
+ return len(l.nameToPartitions)
+}
+
+func (l *TestActiveConsumerListener) getPartitionCount(consumerName string)
int {
+ l.lock.RLock()
+ defer l.lock.RUnlock()
+ return len(l.nameToPartitions[consumerName])
+}
+
+func (l *TestActiveConsumerListener) BecameActive(consumer Consumer, topicName
string, partition int32) {
+ l.t.Logf("%s become active on %s - %d\n", consumer.Name(), topicName,
partition)
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ partitionSet := l.nameToPartitions[consumer.Name()]
+ if partitionSet == nil {
+ partitionSet = map[int32]struct{}{}
+ }
+ partitionSet[partition] = struct{}{}
+ l.nameToPartitions[consumer.Name()] = partitionSet
+}
+
+func (l *TestActiveConsumerListener) BecameInactive(consumer Consumer,
topicName string, partition int32) {
+ l.t.Logf("%s become inactive on %s - %d\n", consumer.Name(), topicName,
partition)
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ partitionSet := l.nameToPartitions[consumer.Name()]
+ if _, ok := partitionSet[partition]; ok {
+ delete(partitionSet, partition)
+ if len(partitionSet) == 0 {
+ delete(l.nameToPartitions, consumer.Name())
+ }
+ }
+}
+
+func allConsume(consumers []Consumer) {
+ for i := 0; i < len(consumers); i++ {
+ ctx, cancel := context.WithTimeout(context.Background(),
2*time.Second)
+ defer cancel()
+ consumers[i].Receive(ctx)
+ }
+}
+
+func TestPartitionTopic_ActiveConsumerChanged(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/" +
randomName + "/partitions"
+
+ makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ var consumers []Consumer
+ listener := &TestActiveConsumerListener{
+ t: t,
+ nameToPartitions: map[string]map[int32]struct{}{},
+ }
+ for i := 0; i < 1; i++ {
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: fmt.Sprintf("consumer-%d", i),
+ SubscriptionName: "my-sub",
+ Type: Failover,
+ EventListener: listener,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+ consumers = append(consumers, consumer)
+ }
+
+ allConsume(consumers)
+ // first consumer will get 3 partitions
+ assert.Equal(t, 1, listener.getConsumerCount())
+ assert.Equal(t, 3, listener.getPartitionCount(consumers[0].Name()))
+
+ // 1 partition per consumer
+ for i := 1; i < 3; i++ {
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Name: fmt.Sprintf("consumer-%d", i),
+ SubscriptionName: "my-sub",
+ Type: Failover,
+ EventListener: listener,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+ consumers = append(consumers, consumer)
+ }
+ allConsume(consumers)
+ assert.Equal(t, 3, listener.getConsumerCount())
+ for _, c := range consumers {
+ assert.Equal(t, 1, listener.getPartitionCount(c.Name()))
+ }
+
+ consumers[0].Close()
+ // wait broker reschedule active consumers
+ time.Sleep(time.Second * 3)
+ allConsume(consumers)
+
+ // close consumer won't get notify
+ assert.Equal(t, 3, listener.getConsumerCount())
+ // residual consumers will cover all partitions
+ assert.Equal(t, 3,
listener.getPartitionCount(consumers[1].Name())+listener.getPartitionCount(consumers[2].Name()))
+ for _, c := range consumers {
+ c.Close()
+ }
+}
+
func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
@@ -443,7 +579,7 @@ func TestPartitionTopicsConsumerPubSubEncryption(t
*testing.T) {
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
- fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ t.Logf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
@@ -536,7 +672,7 @@ func TestConsumerShared(t *testing.T) {
readMsgs++
payload := string(cm.Message.Payload())
messages[payload] = struct{}{}
- fmt.Printf("consumer1 msg id is: %v, value is: %s\n",
cm.Message.ID(), payload)
+ t.Logf("consumer1 msg id is: %v, value is: %s\n",
cm.Message.ID(), payload)
consumer1.Ack(cm.Message)
case cm, ok := <-consumer2.Chan():
if !ok {
@@ -545,7 +681,7 @@ func TestConsumerShared(t *testing.T) {
readMsgs++
payload := string(cm.Message.Payload())
messages[payload] = struct{}{}
- fmt.Printf("consumer2 msg id is: %v, value is: %s\n",
cm.Message.ID(), payload)
+ t.Logf("consumer2 msg id is: %v, value is: %s\n",
cm.Message.ID(), payload)
consumer2.Ack(cm.Message)
}
}
@@ -1764,7 +1900,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))
- fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ t.Logf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
@@ -2105,11 +2241,11 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t
*testing.T) {
assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
- fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received
messages consumer1: %d consumser2: %d\n",
+ t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages
consumer1: %d consumser2: %d\n",
receivedConsumer1, receivedConsumer2)
assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
- fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received
messages keys consumer1: %v consumser2: %v\n",
+ t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages
keys consumer1: %v consumser2: %v\n",
consumer1Keys, consumer2Keys)
}
@@ -2287,7 +2423,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
assert.NotEqual(t, 0, receivedConsumer1)
assert.NotEqual(t, 0, receivedConsumer2)
- fmt.Printf(
+ t.Logf(
"TestConsumerKeySharedWithOrderingKey received messages
consumer1: %d consumser2: %d\n",
receivedConsumer1, receivedConsumer2,
)
@@ -2623,7 +2759,7 @@ func
TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
Value: fmt.Sprintf(message, i),
})
assert.Nil(t, err)
- fmt.Printf("Sent : %v\n", mid)
+ t.Logf("Sent : %v\n", mid)
}
// Consuming from consumer 2 and 3
@@ -2656,7 +2792,7 @@ func
TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
assert.Nil(t, err)
messageMap[*receivedMsg] = struct{}{}
cryptoConsumer.Ack(m)
- fmt.Printf("Received : %v\n", m.ID())
+ t.Logf("Received : %v\n", m.ID())
}
// check if all messages were received
@@ -2904,7 +3040,7 @@ func
TestBatchMessageReceiveWithCompressionAndRSAEcnryption(t *testing.T) {
for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
- fmt.Printf("received : %v\n", string(msg.Payload()))
+ t.Logf("received : %v\n", string(msg.Payload()))
assert.Nil(t, err)
consumer.Ack(msg)
count++
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index fc1b349..fc97455 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -85,6 +85,8 @@ type Connection interface {
type ConsumerHandler interface {
MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer)
error
+ ActiveConsumerChanged(isActive bool)
+
// ConnectionClosed close the TCP connection.
ConnectionClosed()
}
@@ -576,6 +578,7 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
c.handlePong()
case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
+ c.handleActiveConsumerChange(cmd.GetActiveConsumerChange())
default:
c.log.Errorf("Received invalid command type: %s", cmd.Type)
@@ -857,6 +860,16 @@ func (c *connection) handleCloseConsumer(closeConsumer
*pb.CommandCloseConsumer)
}
}
+func (c *connection) handleActiveConsumerChange(consumerChange
*pb.CommandActiveConsumerChange) {
+ consumerID := consumerChange.GetConsumerId()
+ isActive := consumerChange.GetIsActive()
+ if consumer, ok := c.consumerHandler(consumerID); ok {
+ consumer.ActiveConsumerChanged(isActive)
+ } else {
+ c.log.WithField("consumerID", consumerID).Warnf("Consumer not
found while active consumer change")
+ }
+}
+
func (c *connection) handleCloseProducer(closeProducer
*pb.CommandCloseProducer) {
c.log.Infof("Broker notification of Closed producer: %d",
closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()