This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5a1824b [Issue 468] Add Support for NonDurable subscriptions (#992)
5a1824b is described below
commit 5a1824b4e39d6802551c8e1a6e7956d972af62a6
Author: Don Inghram <[email protected]>
AuthorDate: Thu Jun 15 07:17:52 2023 -0600
[Issue 468] Add Support for NonDurable subscriptions (#992)
---
perf/perf-consumer.go | 8 +++++
perf/pulsar-perf-go.go | 6 ++++
pulsar/consumer.go | 4 +++
pulsar/consumer_impl.go | 2 +-
pulsar/consumer_partition.go | 12 ++++----
pulsar/consumer_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++
pulsar/reader_impl.go | 2 +-
7 files changed, 98 insertions(+), 8 deletions(-)
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 825de62..2172af5 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -36,6 +36,8 @@ type ConsumeArgs struct {
ReceiverQueueSize int
EnableBatchIndexAck bool
EnableAutoScaledReceiverQueueSize bool
+ SubscriptionMode pulsar.SubscriptionMode
+ SubscriptionType pulsar.SubscriptionType
}
func newConsumerCommand() *cobra.Command {
@@ -60,6 +62,10 @@ func newConsumerCommand() *cobra.Command {
flags.BoolVar(&consumeArgs.EnableBatchIndexAck,
"enable-batch-index-ack", false, "Whether to enable batch index ACK")
flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize,
"enable-auto-scaled-queue-size", false,
"Whether to enable auto scaled receiver queue size")
+ flags.IntVarP((*int)(&consumeArgs.SubscriptionMode),
"subscription-mode", "m", int(pulsar.Durable),
+ "Subscription mode")
+ flags.IntVarP((*int)(&consumeArgs.SubscriptionType),
"subscription-type", "t", int(pulsar.Exclusive),
+ "Subscription type")
return cmd
}
@@ -83,6 +89,8 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment:
consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize:
consumeArgs.EnableAutoScaledReceiverQueueSize,
+ Type: consumeArgs.SubscriptionType,
+ SubscriptionMode: consumeArgs.SubscriptionMode,
})
if err != nil {
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index a672a30..40257e8 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -43,6 +43,8 @@ type ClientArgs struct {
ServiceURL string
TokenFile string
TLSTrustCertFile string
+ TLSServerCertFile string
+ TLSServerKeyFile string
MaxConnectionsPerBroker int
}
@@ -62,6 +64,8 @@ func NewClient() (pulsar.Client, error) {
os.Exit(1)
}
clientOpts.Authentication =
pulsar.NewAuthenticationToken(string(tokenBytes))
+ } else if clientArgs.TLSServerCertFile != "" &&
clientArgs.TLSServerKeyFile != "" {
+ clientOpts.Authentication =
pulsar.NewAuthenticationTLS(clientArgs.TLSServerCertFile,
clientArgs.TLSServerKeyFile)
}
if clientArgs.TLSTrustCertFile != "" {
@@ -97,6 +101,8 @@ func main() {
flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
"pulsar://localhost:6650", "The Pulsar service URL")
flags.StringVar(&clientArgs.TokenFile, "token-file", "", "file path to
the Pulsar JWT file")
+ flags.StringVar(&clientArgs.TLSServerCertFile, "cert-file", "", "file
path to the TLS authentication cert")
+ flags.StringVar(&clientArgs.TLSServerKeyFile, "key-file", "", "file
path to the TLS authentication key")
flags.StringVar(&clientArgs.TLSTrustCertFile, "trust-cert-file", "",
"file path to the trusted certificate file")
flags.IntVarP(&clientArgs.MaxConnectionsPerBroker, "max-connections",
"c", 1,
"Max connections to open to broker. Defaults to 1.")
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 3ef72c7..a62eabe 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -242,6 +242,10 @@ type ConsumerOptions struct {
// NOTE: This option does not work if AckWithResponse is true
// because there are only synchronous APIs for acknowledgment
AckGroupingOptions *AckGroupingOptions
+
+ // SubscriptionMode specifies the subscription mode to be used when
subscribing to a topic.
+ // Default is `Durable`
+ SubscriptionMode SubscriptionMode
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 07f38c3..d782bea 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -385,7 +385,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
subProperties: subProperties,
replicateSubscriptionState:
c.options.ReplicateSubscriptionState,
startMessageID: nil,
- subscriptionMode: durable,
+ subscriptionMode:
c.options.SubscriptionMode,
readCompacted:
c.options.ReadCompacted,
interceptors:
c.options.Interceptors,
maxReconnectToBroker:
c.options.MaxReconnectToBroker,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 6e241d8..2d2a194 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -65,15 +65,15 @@ func (s consumerState) String() string {
}
}
-type subscriptionMode int
+type SubscriptionMode int
const (
// Make the subscription to be backed by a durable cursor that will
retain messages and persist the current
// position
- durable subscriptionMode = iota
+ Durable SubscriptionMode = iota
// Lightweight subscription mode that doesn't have a durable cursor
associated
- nonDurable
+ NonDurable
)
const (
@@ -101,7 +101,7 @@ type partitionConsumerOpts struct {
replicateSubscriptionState bool
startMessageID *trackingMessageID
startMessageIDInclusive bool
- subscriptionMode subscriptionMode
+ subscriptionMode SubscriptionMode
readCompacted bool
disableForceTopicCreation bool
interceptors ConsumerInterceptors
@@ -1698,7 +1698,7 @@ func (pc *partitionConsumer) grabConn() error {
RequestId: proto.Uint64(requestID),
ConsumerName: proto.String(pc.name),
PriorityLevel: nil,
- Durable:
proto.Bool(pc.options.subscriptionMode == durable),
+ Durable:
proto.Bool(pc.options.subscriptionMode == Durable),
Metadata:
internal.ConvertFromStringMap(pc.options.metadata),
SubscriptionProperties:
internal.ConvertFromStringMap(pc.options.subProperties),
ReadCompacted:
proto.Bool(pc.options.readCompacted),
@@ -1709,7 +1709,7 @@ func (pc *partitionConsumer) grabConn() error {
}
pc.startMessageID.set(pc.clearReceiverQueue())
- if pc.options.subscriptionMode != durable {
+ if pc.options.subscriptionMode != Durable {
// For regular subscriptions the broker will determine the
restarting point
cmdSubscribe.StartMessageId =
convertToMessageIDData(pc.startMessageID.get())
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 521e576..2d785b4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -4074,6 +4074,78 @@ func TestConsumerWithAutoScaledQueueReceive(t
*testing.T) {
})
}
+func TestConsumerNonDurable(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ SubscriptionMode: NonDurable,
+ })
+ assert.Nil(t, err)
+
+ i := 1
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+ consumer.Ack(msg)
+
+ consumer.Close()
+
+ i++
+
+ // send a message. Pulsar should delete it as there is no active
subscription
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ i++
+
+ // Subscribe again
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ SubscriptionMode: NonDurable,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ msg, err = consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+ consumer.Ack(msg)
+}
+
func TestConsumerBatchIndexAckDisabled(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 36b492a..5a2128a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -106,7 +106,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
receiverQueueSize: receiverQueueSize,
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
- subscriptionMode: nonDurable,
+ subscriptionMode: NonDurable,
readCompacted: options.ReadCompacted,
metadata: options.Properties,
nackRedeliveryDelay: defaultNackRedeliveryDelay,