This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a1824b  [Issue 468] Add Support for NonDurable subscriptions (#992)
5a1824b is described below

commit 5a1824b4e39d6802551c8e1a6e7956d972af62a6
Author: Don Inghram <[email protected]>
AuthorDate: Thu Jun 15 07:17:52 2023 -0600

    [Issue 468] Add Support for NonDurable subscriptions (#992)
---
 perf/perf-consumer.go        |  8 +++++
 perf/pulsar-perf-go.go       |  6 ++++
 pulsar/consumer.go           |  4 +++
 pulsar/consumer_impl.go      |  2 +-
 pulsar/consumer_partition.go | 12 ++++----
 pulsar/consumer_test.go      | 72 ++++++++++++++++++++++++++++++++++++++++++++
 pulsar/reader_impl.go        |  2 +-
 7 files changed, 98 insertions(+), 8 deletions(-)

diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 825de62..2172af5 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -36,6 +36,8 @@ type ConsumeArgs struct {
        ReceiverQueueSize                 int
        EnableBatchIndexAck               bool
        EnableAutoScaledReceiverQueueSize bool
+       SubscriptionMode                  pulsar.SubscriptionMode
+       SubscriptionType                  pulsar.SubscriptionType
 }
 
 func newConsumerCommand() *cobra.Command {
@@ -60,6 +62,10 @@ func newConsumerCommand() *cobra.Command {
        flags.BoolVar(&consumeArgs.EnableBatchIndexAck, 
"enable-batch-index-ack", false, "Whether to enable batch index ACK")
        flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize, 
"enable-auto-scaled-queue-size", false,
                "Whether to enable auto scaled receiver queue size")
+       flags.IntVarP((*int)(&consumeArgs.SubscriptionMode), 
"subscription-mode", "m", int(pulsar.Durable),
+               "Subscription mode")
+       flags.IntVarP((*int)(&consumeArgs.SubscriptionType), 
"subscription-type", "t", int(pulsar.Exclusive),
+               "Subscription type")
 
        return cmd
 }
@@ -83,6 +89,8 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
                SubscriptionName:                  consumeArgs.SubscriptionName,
                EnableBatchIndexAcknowledgment:    
consumeArgs.EnableBatchIndexAck,
                EnableAutoScaledReceiverQueueSize: 
consumeArgs.EnableAutoScaledReceiverQueueSize,
+               Type:                              consumeArgs.SubscriptionType,
+               SubscriptionMode:                  consumeArgs.SubscriptionMode,
        })
 
        if err != nil {
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index a672a30..40257e8 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -43,6 +43,8 @@ type ClientArgs struct {
        ServiceURL              string
        TokenFile               string
        TLSTrustCertFile        string
+       TLSServerCertFile       string
+       TLSServerKeyFile        string
        MaxConnectionsPerBroker int
 }
 
@@ -62,6 +64,8 @@ func NewClient() (pulsar.Client, error) {
                        os.Exit(1)
                }
                clientOpts.Authentication = 
pulsar.NewAuthenticationToken(string(tokenBytes))
+       } else if clientArgs.TLSServerCertFile != "" && 
clientArgs.TLSServerKeyFile != "" {
+               clientOpts.Authentication = 
pulsar.NewAuthenticationTLS(clientArgs.TLSServerCertFile, 
clientArgs.TLSServerKeyFile)
        }
 
        if clientArgs.TLSTrustCertFile != "" {
@@ -97,6 +101,8 @@ func main() {
        flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
                "pulsar://localhost:6650", "The Pulsar service URL")
        flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to 
the Pulsar JWT file")
+       flags.StringVar(&clientArgs.TLSServerCertFile, "cert-file", "", "file 
path to the TLS authentication cert")
+       flags.StringVar(&clientArgs.TLSServerKeyFile, "key-file", "", "file 
path to the TLS authentication key")
        flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "", 
"file path to the trusted certificate file")
        flags.IntVarP(&clientArgs.MaxConnectionsPerBroker, "max-connections", 
"c", 1,
                "Max connections to open to broker. Defaults to 1.")
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 3ef72c7..a62eabe 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -242,6 +242,10 @@ type ConsumerOptions struct {
        // NOTE: This option does not work if AckWithResponse is true
        //      because there are only synchronous APIs for acknowledgment
        AckGroupingOptions *AckGroupingOptions
+
+       // SubscriptionMode specifies the subscription mode to be used when 
subscribing to a topic.
+       // Default is `Durable`
+       SubscriptionMode SubscriptionMode
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 07f38c3..d782bea 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -385,7 +385,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                subProperties:               subProperties,
                                replicateSubscriptionState:  
c.options.ReplicateSubscriptionState,
                                startMessageID:              nil,
-                               subscriptionMode:            durable,
+                               subscriptionMode:            
c.options.SubscriptionMode,
                                readCompacted:               
c.options.ReadCompacted,
                                interceptors:                
c.options.Interceptors,
                                maxReconnectToBroker:        
c.options.MaxReconnectToBroker,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 6e241d8..2d2a194 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -65,15 +65,15 @@ func (s consumerState) String() string {
        }
 }
 
-type subscriptionMode int
+type SubscriptionMode int
 
 const (
        // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
        // position
-       durable subscriptionMode = iota
+       Durable SubscriptionMode = iota
 
        // Lightweight subscription mode that doesn't have a durable cursor 
associated
-       nonDurable
+       NonDurable
 )
 
 const (
@@ -101,7 +101,7 @@ type partitionConsumerOpts struct {
        replicateSubscriptionState  bool
        startMessageID              *trackingMessageID
        startMessageIDInclusive     bool
-       subscriptionMode            subscriptionMode
+       subscriptionMode            SubscriptionMode
        readCompacted               bool
        disableForceTopicCreation   bool
        interceptors                ConsumerInterceptors
@@ -1698,7 +1698,7 @@ func (pc *partitionConsumer) grabConn() error {
                RequestId:                  proto.Uint64(requestID),
                ConsumerName:               proto.String(pc.name),
                PriorityLevel:              nil,
-               Durable:                    
proto.Bool(pc.options.subscriptionMode == durable),
+               Durable:                    
proto.Bool(pc.options.subscriptionMode == Durable),
                Metadata:                   
internal.ConvertFromStringMap(pc.options.metadata),
                SubscriptionProperties:     
internal.ConvertFromStringMap(pc.options.subProperties),
                ReadCompacted:              
proto.Bool(pc.options.readCompacted),
@@ -1709,7 +1709,7 @@ func (pc *partitionConsumer) grabConn() error {
        }
 
        pc.startMessageID.set(pc.clearReceiverQueue())
-       if pc.options.subscriptionMode != durable {
+       if pc.options.subscriptionMode != Durable {
                // For regular subscriptions the broker will determine the 
restarting point
                cmdSubscribe.StartMessageId = 
convertToMessageIDData(pc.startMessageID.get())
        }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 521e576..2d785b4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -4074,6 +4074,78 @@ func TestConsumerWithAutoScaledQueueReceive(t 
*testing.T) {
        })
 }
 
+func TestConsumerNonDurable(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Shared,
+               SubscriptionMode: NonDurable,
+       })
+       assert.Nil(t, err)
+
+       i := 1
+       if _, err := producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+       }); err != nil {
+               t.Fatal(err)
+       }
+
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+       consumer.Ack(msg)
+
+       consumer.Close()
+
+       i++
+
+       // send a message. Pulsar should delete it as there is no active 
subscription
+       if _, err := producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+       }); err != nil {
+               t.Fatal(err)
+       }
+
+       i++
+
+       // Subscribe again
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Shared,
+               SubscriptionMode: NonDurable,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       if _, err := producer.Send(ctx, &ProducerMessage{
+               Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+       }); err != nil {
+               t.Fatal(err)
+       }
+
+       msg, err = consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+       consumer.Ack(msg)
+}
+
 func TestConsumerBatchIndexAckDisabled(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 36b492a..5a2128a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -106,7 +106,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                receiverQueueSize:           receiverQueueSize,
                startMessageID:              startMessageID,
                startMessageIDInclusive:     options.StartMessageIDInclusive,
-               subscriptionMode:            nonDurable,
+               subscriptionMode:            NonDurable,
                readCompacted:               options.ReadCompacted,
                metadata:                    options.Properties,
                nackRedeliveryDelay:         defaultNackRedeliveryDelay,

Reply via email to