This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b434511  [Issue 52]Add interceptor (#314)
b434511 is described below

commit b4345119bc9b0ba36b2fa132a4f5674d40e3cabd
Author: Lijingfeng <[email protected]>
AuthorDate: Tue Jul 14 10:24:59 2020 +0800

    [Issue 52]Add interceptor (#314)
    
    ### Motivation
    Add A chain of interceptors for Producer and Consumer as an option, these 
interceptors will be called at some points, it can be used for tracing, 
metrics, and so on.
    
    ### Modifications
    Add two files for interceptor definition.
    Call interceptor's methods at appropriate position.
    * review test
    
    Co-authored-by: lijingfeng <[email protected]>
---
 pulsar/consumer.go                |   3 +
 pulsar/consumer_impl.go           |   5 ++
 pulsar/consumer_interceptor.go    |  51 +++++++++++++
 pulsar/consumer_partition.go      |  14 ++++
 pulsar/consumer_partition_test.go |   3 +
 pulsar/consumer_test.go           | 146 ++++++++++++++++++++++++++++++++++++++
 pulsar/producer.go                |   3 +
 pulsar/producer_impl.go           |   4 ++
 pulsar/producer_interceptor.go    |  43 +++++++++++
 pulsar/producer_partition.go      |  10 ++-
 pulsar/producer_test.go           | 102 ++++++++++++++++++++++++++
 11 files changed, 382 insertions(+), 2 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 8d3c771..c1fe454 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -137,6 +137,9 @@ type ConsumerOptions struct {
 
        // Mark the subscription as replicated to keep it in sync across 
clusters
        ReplicateSubscriptionState bool
+
+       // A chain of interceptors, These interceptors will be called at some 
points defined in ConsumerInterceptor interface.
+       Interceptors ConsumerInterceptors
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e3db670..ef93037 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -94,6 +94,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                options.ReceiverQueueSize = 1000
        }
 
+       if options.Interceptors == nil {
+               options.Interceptors = defaultConsumerInterceptors
+       }
+
        if options.Name == "" {
                options.Name = generateRandomName()
        }
@@ -262,6 +266,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                startMessageID:             messageID{},
                                subscriptionMode:           durable,
                                readCompacted:              
c.options.ReadCompacted,
+                               interceptors:               
c.options.Interceptors,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq)
                        ch <- ConsumerError{
diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go
new file mode 100644
index 0000000..db46b78
--- /dev/null
+++ b/pulsar/consumer_interceptor.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+type ConsumerInterceptor interface {
+       // BeforeConsume This is called just before the message is send to 
Consumer's ConsumerMessage channel.
+       BeforeConsume(message ConsumerMessage)
+
+       // OnAcknowledge This is called consumer sends the acknowledgment to 
the broker.
+       OnAcknowledge(consumer Consumer, msgID MessageID)
+
+       // OnNegativeAcksSend This method will be called when a redelivery from 
a negative acknowledge occurs.
+       OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID)
+}
+
+type ConsumerInterceptors []ConsumerInterceptor
+
+func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) {
+       for i := range x {
+               x[i].BeforeConsume(message)
+       }
+}
+
+func (x ConsumerInterceptors) OnAcknowledge(consumer Consumer, msgID 
MessageID) {
+       for i := range x {
+               x[i].OnAcknowledge(consumer, msgID)
+       }
+}
+
+func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {
+       for i := range x {
+               x[i].OnNegativeAcksSend(consumer, msgIDs)
+       }
+}
+
+var defaultConsumerInterceptors = make(ConsumerInterceptors, 0)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index acf897d..0c723c8 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -114,6 +114,7 @@ type partitionConsumerOpts struct {
        subscriptionMode           subscriptionMode
        readCompacted              bool
        disableForceTopicCreation  bool
+       interceptors               ConsumerInterceptors
 }
 
 type partitionConsumer struct {
@@ -274,6 +275,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
                        msgID: msgID,
                }
                pc.eventsCh <- req
+
+               pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
        }
 }
 
@@ -284,6 +287,12 @@ func (pc *partitionConsumer) NackID(msgID messageID) {
 
 func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
        pc.eventsCh <- &redeliveryRequest{msgIds}
+
+       iMsgIds := make([]MessageID, len(msgIds))
+       for i := range iMsgIds {
+               iMsgIds[i] = &msgIds[i]
+       }
+       pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds)
 }
 
 func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
@@ -498,6 +507,11 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                        }
                }
 
+               pc.options.interceptors.BeforeConsume(ConsumerMessage{
+                       Consumer: pc.parentConsumer,
+                       Message:  msg,
+               })
+
                messages = append(messages, msg)
        }
 
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index 0fcbdc5..01831a4 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -34,6 +34,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
                queueCh:              make(chan []*message, 1),
                eventsCh:             eventsCh,
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
+               options:              &partitionConsumerOpts{},
        }
 
        headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -63,6 +64,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
                queueCh:              make(chan []*message, 1),
                eventsCh:             eventsCh,
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
+               options:              &partitionConsumerOpts{},
        }
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -92,6 +94,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
                queueCh:              make(chan []*message, 1),
                eventsCh:             eventsCh,
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
+               options:              &partitionConsumerOpts{},
        }
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4031f7d..a3a22b6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -23,6 +23,7 @@ import (
        "log"
        "net/http"
        "strconv"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -1343,6 +1344,151 @@ func TestProducerName(t *testing.T) {
        }
 }
 
+type noopConsumerInterceptor struct{}
+
+func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+
+func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+
+// copyPropertyInterceptor copy all keys in message properties map and add a 
suffix
+type copyPropertyInterceptor struct {
+       suffix string
+}
+
+func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
+       properties := message.Properties()
+       copy := make(map[string]string, len(properties))
+       for k, v := range properties {
+               copy[k+x.suffix] = v
+       }
+       for ck, v := range copy {
+               properties[ck] = v
+       }
+}
+
+func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+
+func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+
+type metricConsumerInterceptor struct {
+       ackn  int32
+       nackn int32
+}
+
+func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+
+func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {
+       atomic.AddInt32(&x.ackn, 1)
+}
+
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, 
msgIDs []MessageID) {
+       atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
+}
+
+func TestConsumerWithInterceptors(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       metric := &metricConsumerInterceptor{}
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    "my-sub",
+               Type:                Exclusive,
+               NackRedeliveryDelay: time.Second, // for testing nack
+               Interceptors: ConsumerInterceptors{
+                       noopConsumerInterceptor{},
+                       copyPropertyInterceptor{suffix: "-copy"},
+                       metric,
+               },
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       var nackIds []MessageID
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1":      "pulsar-1",
+                       "key-1-copy": "pulsar-1", // check properties copy by 
interceptor
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+
+               // ack message
+               if i%2 == 0 {
+                       consumer.Ack(msg)
+               } else {
+                       nackIds = append(nackIds, msg.ID())
+               }
+       }
+       assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
+
+       for i := range nackIds {
+               consumer.NackID(nackIds[i])
+       }
+
+       // receive 5 nack messages
+       for i := 0; i < 5; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i*2+1)
+               expectProperties := map[string]string{
+                       "key-1":      "pulsar-1",
+                       "key-1-copy": "pulsar-1", // check properties copy by 
interceptor
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+
+               // ack message
+               consumer.Ack(msg)
+       }
+
+       assert.Equal(t, int32(5), atomic.LoadInt32(&metric.nackn))
+}
+
 func TestConsumerName(t *testing.T) {
        assert := assert.New(t)
 
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 7d44a56..cb38e3e 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -135,6 +135,9 @@ type ProducerOptions struct {
        // If set to a value greater than 1, messages will be queued until this 
threshold is reached or
        // BatchingMaxMessages (see above) has been reached or the batch 
interval has elapsed.
        BatchingMaxSize uint
+
+       // A chain of interceptors, These interceptors will be called at some 
points defined in ProducerInterceptor interface
+       Interceptors ProducerInterceptors
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 01c5d76..4ee0d8d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -97,6 +97,10 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
        }
 
+       if options.Interceptors == nil {
+               options.Interceptors = defaultProducerInterceptors
+       }
+
        if options.MessageRouter == nil {
                internalRouter := internal.NewDefaultRouter(
                        internal.NewSystemClock(),
diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go
new file mode 100644
index 0000000..cb2cc15
--- /dev/null
+++ b/pulsar/producer_interceptor.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+type ProducerInterceptor interface {
+       // BeforeSend This is called before send the message to the brokers. 
This method is allowed to modify the
+       BeforeSend(producer Producer, message *ProducerMessage)
+
+       // OnSendAcknowledgement This method is called when the message sent to 
the broker has been acknowledged,
+       // or when sending the message fails.
+       OnSendAcknowledgement(producer Producer, message *ProducerMessage, 
msgID MessageID)
+}
+
+type ProducerInterceptors []ProducerInterceptor
+
+func (x ProducerInterceptors) BeforeSend(producer Producer, message 
*ProducerMessage) {
+       for i := range x {
+               x[i].BeforeSend(producer, message)
+       }
+}
+
+func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message 
*ProducerMessage, msgID MessageID) {
+       for i := range x {
+               x[i].OnSendAcknowledgement(producer, message, msgID)
+       }
+}
+
+var defaultProducerInterceptors = make(ProducerInterceptors, 0)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b8dc13d..3832d69 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -444,6 +444,7 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
                flushImmediately: flushImmediately,
                publishTime:      time.Now(),
        }
+       p.options.Interceptors.BeforeSend(p, msg)
 
        messagesPending.Inc()
        bytesPending.Add(float64(len(sr.msg.Payload)))
@@ -488,14 +489,19 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                        bytesPending.Sub(payloadSize)
                }
 
-               if sr.callback != nil {
+               if sr.callback != nil || len(p.options.Interceptors) > 0 {
                        msgID := newMessageID(
                                int64(response.MessageId.GetLedgerId()),
                                int64(response.MessageId.GetEntryId()),
                                int32(idx),
                                p.partitionIdx,
                        )
-                       sr.callback(msgID, sr.msg, nil)
+
+                       if sr.callback != nil {
+                               sr.callback(msgID, sr.msg, nil)
+                       }
+
+                       p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, 
msgID)
                }
        }
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 8d389cb..d06ddbb 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -817,3 +817,105 @@ func TestMaxMessageSize(t *testing.T) {
                }
        }
 }
+
+type noopProduceInterceptor struct{}
+
+func (noopProduceInterceptor) BeforeSend(producer Producer, message 
*ProducerMessage) {}
+
+func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message 
*ProducerMessage, msgID MessageID) {
+}
+
+// copyPropertyIntercepotr copy all keys in message properties map and add a 
suffix
+type metricProduceInterceptor struct {
+       sendn int
+       ackn  int
+}
+
+func (x *metricProduceInterceptor) BeforeSend(producer Producer, message 
*ProducerMessage) {
+       x.sendn++
+}
+
+func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, 
message *ProducerMessage, msgID MessageID) {
+       x.ackn++
+}
+
+func TestProducerWithInterceptors(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/test-topic-interceptors"
+       ctx := context.Background()
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+               Type:             Exclusive,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       metric := &metricProduceInterceptor{}
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+               Interceptors: ProducerInterceptors{
+                       noopProduceInterceptor{},
+                       metric,
+               },
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if i%2 == 0 {
+                       _, err := producer.Send(ctx, &ProducerMessage{
+                               Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                               Key:     "pulsar",
+                               Properties: map[string]string{
+                                       "key-1": "pulsar-1",
+                               },
+                       })
+                       assert.Nil(t, err)
+               } else {
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                               Key:     "pulsar",
+                               Properties: map[string]string{
+                                       "key-1": "pulsar-1",
+                               },
+                       }, func(_ MessageID, _ *ProducerMessage, err error) {
+                               assert.Nil(t, err)
+                       })
+                       assert.Nil(t, err)
+               }
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+
+               // ack message
+               consumer.Ack(msg)
+       }
+
+       assert.Equal(t, 10, metric.sendn)
+       assert.Equal(t, 10, metric.ackn)
+}

Reply via email to