This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe9b62  [feat] support for consumer event listener (#904)
1fe9b62 is described below

commit 1fe9b624ca673e3a647780b7c6c4b2567089285a
Author: labuladong <[email protected]>
AuthorDate: Tue Dec 13 09:54:35 2022 +0800

    [feat] support for consumer event listener (#904)
    
    ### Motivation
    
    Support consumer active events.
    
    ### Modifications
    
    Add an option to consumers, add callback on 
`BaseCommand_ACTIVE_CONSUMER_CHANGE`.
---
 pulsar/consumer.go            |   3 +
 pulsar/consumer_impl.go       |   1 +
 pulsar/consumer_partition.go  |  20 ++++++
 pulsar/consumer_test.go       | 160 ++++++++++++++++++++++++++++++++++++++----
 pulsar/internal/connection.go |  13 ++++
 5 files changed, 185 insertions(+), 12 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index ffef786..0a515e0 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -120,6 +120,9 @@ type ConsumerOptions struct {
        // Default is `Latest`
        SubscriptionInitialPosition
 
+       // EventListener will be called when active consumer changed (in 
failover subscription type)
+       EventListener ConsumerEventListener
+
        // DLQ represents the configuration for Dead Letter Queue consumer 
policy.
        // eg. route the message to topic X after N failed attempts at 
processing it
        // By default is nil and there's no DLQ
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index a402a4d..707c13c 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -376,6 +376,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                maxPendingChunkedMessage:    
c.options.MaxPendingChunkedMessage,
                                expireTimeOfIncompleteChunk: 
c.options.ExpireTimeOfIncompleteChunk,
                                autoAckIncompleteChunk:      
c.options.AutoAckIncompleteChunk,
+                               consumerEventListener:       
c.options.EventListener,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq, c.metrics)
                        ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4149a91..46f13d8 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -112,6 +112,13 @@ type partitionConsumerOpts struct {
        maxPendingChunkedMessage    int
        expireTimeOfIncompleteChunk time.Duration
        autoAckIncompleteChunk      bool
+       // in failover mode, this callback will be called when consumer change
+       consumerEventListener ConsumerEventListener
+}
+
+type ConsumerEventListener interface {
+       BecameActive(consumer Consumer, topicName string, partition int32)
+       BecameInactive(consumer Consumer, topicName string, partition int32)
 }
 
 type partitionConsumer struct {
@@ -161,6 +168,19 @@ type partitionConsumer struct {
        unAckChunksTracker *unAckChunksTracker
 }
 
+func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
+       listener := pc.options.consumerEventListener
+       if listener == nil {
+               // didn't set a listener
+               return
+       }
+       if isActive {
+               listener.BecameActive(pc.parentConsumer, pc.topic, 
pc.partitionIdx)
+       } else {
+               listener.BecameInactive(pc.parentConsumer, pc.topic, 
pc.partitionIdx)
+       }
+}
+
 type availablePermits struct {
        permits int32
        pc      *partitionConsumer
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 95462ba..58a524a 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -25,6 +25,7 @@ import (
        "log"
        "net/http"
        "strconv"
+       "sync"
        "sync/atomic"
        "testing"
        "time"
@@ -321,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) {
        assert.NotEqual(t, 0, receivedConsumer1)
        assert.NotEqual(t, 0, receivedConsumer2)
 
-       fmt.Printf("TestConsumerKeyShared received messages consumer1: %d 
consumser2: %d\n",
+       t.Logf("TestConsumerKeyShared received messages consumer1: %d 
consumser2: %d\n",
                receivedConsumer1, receivedConsumer2)
        assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
 }
@@ -375,7 +376,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
                assert.Nil(t, err)
                msgs = append(msgs, string(msg.Payload()))
 
-               fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+               t.Logf("Received message msgId: %#v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
 
                consumer.Ack(msg)
@@ -384,6 +385,141 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        assert.Equal(t, len(msgs), 10)
 }
 
+type TestActiveConsumerListener struct {
+       t                *testing.T
+       lock             sync.RWMutex
+       nameToPartitions map[string]map[int32]struct{}
+}
+
+func (l *TestActiveConsumerListener) getConsumerCount() int {
+       l.lock.RLock()
+       defer l.lock.RUnlock()
+       return len(l.nameToPartitions)
+}
+
+func (l *TestActiveConsumerListener) getPartitionCount(consumerName string) 
int {
+       l.lock.RLock()
+       defer l.lock.RUnlock()
+       return len(l.nameToPartitions[consumerName])
+}
+
+func (l *TestActiveConsumerListener) BecameActive(consumer Consumer, topicName 
string, partition int32) {
+       l.t.Logf("%s become active on %s - %d\n", consumer.Name(), topicName, 
partition)
+       l.lock.Lock()
+       defer l.lock.Unlock()
+       partitionSet := l.nameToPartitions[consumer.Name()]
+       if partitionSet == nil {
+               partitionSet = map[int32]struct{}{}
+       }
+       partitionSet[partition] = struct{}{}
+       l.nameToPartitions[consumer.Name()] = partitionSet
+}
+
+func (l *TestActiveConsumerListener) BecameInactive(consumer Consumer, 
topicName string, partition int32) {
+       l.t.Logf("%s become inactive on %s - %d\n", consumer.Name(), topicName, 
partition)
+       l.lock.Lock()
+       defer l.lock.Unlock()
+       partitionSet := l.nameToPartitions[consumer.Name()]
+       if _, ok := partitionSet[partition]; ok {
+               delete(partitionSet, partition)
+               if len(partitionSet) == 0 {
+                       delete(l.nameToPartitions, consumer.Name())
+               }
+       }
+}
+
+func allConsume(consumers []Consumer) {
+       for i := 0; i < len(consumers); i++ {
+               ctx, cancel := context.WithTimeout(context.Background(), 
2*time.Second)
+               defer cancel()
+               consumers[i].Receive(ctx)
+       }
+}
+
+func TestPartitionTopic_ActiveConsumerChanged(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       randomName := newTopicName()
+       topic := "persistent://public/default/" + randomName
+       testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + 
randomName + "/partitions"
+
+       makeHTTPCall(t, http.MethodPut, testURL, "3")
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       var consumers []Consumer
+       listener := &TestActiveConsumerListener{
+               t:                t,
+               nameToPartitions: map[string]map[int32]struct{}{},
+       }
+       for i := 0; i < 1; i++ {
+               consumer, err := client.Subscribe(ConsumerOptions{
+                       Topic:            topic,
+                       Name:             fmt.Sprintf("consumer-%d", i),
+                       SubscriptionName: "my-sub",
+                       Type:             Failover,
+                       EventListener:    listener,
+               })
+               assert.Nil(t, err)
+               defer consumer.Close()
+               consumers = append(consumers, consumer)
+       }
+
+       allConsume(consumers)
+       // first consumer will get 3 partitions
+       assert.Equal(t, 1, listener.getConsumerCount())
+       assert.Equal(t, 3, listener.getPartitionCount(consumers[0].Name()))
+
+       // 1 partition per consumer
+       for i := 1; i < 3; i++ {
+               consumer, err := client.Subscribe(ConsumerOptions{
+                       Topic:            topic,
+                       Name:             fmt.Sprintf("consumer-%d", i),
+                       SubscriptionName: "my-sub",
+                       Type:             Failover,
+                       EventListener:    listener,
+               })
+               assert.Nil(t, err)
+               defer consumer.Close()
+               consumers = append(consumers, consumer)
+       }
+       allConsume(consumers)
+       assert.Equal(t, 3, listener.getConsumerCount())
+       for _, c := range consumers {
+               assert.Equal(t, 1, listener.getPartitionCount(c.Name()))
+       }
+
+       consumers[0].Close()
+       // wait broker reschedule active consumers
+       time.Sleep(time.Second * 3)
+       allConsume(consumers)
+
+       // close consumer won't get notify
+       assert.Equal(t, 3, listener.getConsumerCount())
+       // residual consumers will cover all partitions
+       assert.Equal(t, 3, 
listener.getPartitionCount(consumers[1].Name())+listener.getPartitionCount(consumers[2].Name()))
+       for _, c := range consumers {
+               c.Close()
+       }
+}
+
 func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
@@ -443,7 +579,7 @@ func TestPartitionTopicsConsumerPubSubEncryption(t 
*testing.T) {
                assert.Nil(t, err)
                msgs = append(msgs, string(msg.Payload()))
 
-               fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+               t.Logf("Received message msgId: %#v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
 
                consumer.Ack(msg)
@@ -536,7 +672,7 @@ func TestConsumerShared(t *testing.T) {
                        readMsgs++
                        payload := string(cm.Message.Payload())
                        messages[payload] = struct{}{}
-                       fmt.Printf("consumer1 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
+                       t.Logf("consumer1 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
                        consumer1.Ack(cm.Message)
                case cm, ok := <-consumer2.Chan():
                        if !ok {
@@ -545,7 +681,7 @@ func TestConsumerShared(t *testing.T) {
                        readMsgs++
                        payload := string(cm.Message.Payload())
                        messages[payload] = struct{}{}
-                       fmt.Printf("consumer2 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
+                       t.Logf("consumer2 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
                        consumer2.Ack(cm.Message)
                }
        }
@@ -1764,7 +1900,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
                assert.Nil(t, err)
                msgs = append(msgs, string(msg.Payload()))
 
-               fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+               t.Logf("Received message msgId: %#v -- content: '%s'\n",
                        msg.ID(), string(msg.Payload()))
 
                consumer.Ack(msg)
@@ -2105,11 +2241,11 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t 
*testing.T) {
        assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1)
        assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2)
 
-       fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received 
messages consumer1: %d consumser2: %d\n",
+       t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages 
consumer1: %d consumser2: %d\n",
                receivedConsumer1, receivedConsumer2)
        assert.Equal(t, 300, receivedConsumer1+receivedConsumer2)
 
-       fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received 
messages keys consumer1: %v consumser2: %v\n",
+       t.Logf("TestKeyBasedBatchProducerConsumerKeyShared received messages 
keys consumer1: %v consumser2: %v\n",
                consumer1Keys, consumer2Keys)
 }
 
@@ -2287,7 +2423,7 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
        assert.NotEqual(t, 0, receivedConsumer1)
        assert.NotEqual(t, 0, receivedConsumer2)
 
-       fmt.Printf(
+       t.Logf(
                "TestConsumerKeySharedWithOrderingKey received messages 
consumer1: %d consumser2: %d\n",
                receivedConsumer1, receivedConsumer2,
        )
@@ -2623,7 +2759,7 @@ func 
TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
                        Value: fmt.Sprintf(message, i),
                })
                assert.Nil(t, err)
-               fmt.Printf("Sent : %v\n", mid)
+               t.Logf("Sent : %v\n", mid)
        }
 
        // Consuming from consumer 2 and 3
@@ -2656,7 +2792,7 @@ func 
TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
                assert.Nil(t, err)
                messageMap[*receivedMsg] = struct{}{}
                cryptoConsumer.Ack(m)
-               fmt.Printf("Received : %v\n", m.ID())
+               t.Logf("Received : %v\n", m.ID())
        }
 
        // check if all messages were received
@@ -2904,7 +3040,7 @@ func 
TestBatchMessageReceiveWithCompressionAndRSAEcnryption(t *testing.T) {
 
        for i := 0; i < numOfMessages; i++ {
                msg, err := consumer.Receive(ctx)
-               fmt.Printf("received : %v\n", string(msg.Payload()))
+               t.Logf("received : %v\n", string(msg.Payload()))
                assert.Nil(t, err)
                consumer.Ack(msg)
                count++
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index fc1b349..fc97455 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -85,6 +85,8 @@ type Connection interface {
 type ConsumerHandler interface {
        MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) 
error
 
+       ActiveConsumerChanged(isActive bool)
+
        // ConnectionClosed close the TCP connection.
        ConnectionClosed()
 }
@@ -576,6 +578,7 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
                c.handlePong()
 
        case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
+               c.handleActiveConsumerChange(cmd.GetActiveConsumerChange())
 
        default:
                c.log.Errorf("Received invalid command type: %s", cmd.Type)
@@ -857,6 +860,16 @@ func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer)
        }
 }
 
+func (c *connection) handleActiveConsumerChange(consumerChange 
*pb.CommandActiveConsumerChange) {
+       consumerID := consumerChange.GetConsumerId()
+       isActive := consumerChange.GetIsActive()
+       if consumer, ok := c.consumerHandler(consumerID); ok {
+               consumer.ActiveConsumerChanged(isActive)
+       } else {
+               c.log.WithField("consumerID", consumerID).Warnf("Consumer not 
found while active consumer change")
+       }
+}
+
 func (c *connection) handleCloseProducer(closeProducer 
*pb.CommandCloseProducer) {
        c.log.Infof("Broker notification of Closed producer: %d", 
closeProducer.GetProducerId())
        producerID := closeProducer.GetProducerId()

Reply via email to