This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6edc8f4  Add flag to disable forced topic creation. (#226)
6edc8f4 is described below

commit 6edc8f4ef954477eb36eed92e88464f56b4c48ee
Author: cckellogg <[email protected]>
AuthorDate: Sat Apr 25 06:39:51 2020 -0700

    Add flag to disable forced topic creation. (#226)
    
    This flag is needed for the regex consumer.
---
 pulsar/consumer_impl.go      | 32 +++++++++++++++++---------------
 pulsar/consumer_partition.go |  7 +++++++
 pulsar/consumer_regex.go     |  2 +-
 3 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ba7d24d..3a27def 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -42,11 +42,12 @@ type acker interface {
 
 type consumer struct {
        sync.Mutex
-       topic        string
-       client       *client
-       options      ConsumerOptions
-       consumers    []*partitionConsumer
-       consumerName string
+       topic                     string
+       client                    *client
+       options                   ConsumerOptions
+       consumers                 []*partitionConsumer
+       consumerName              string
+       disableForceTopicCreation bool
 
        // channel used to deliver message to clients
        messageCh chan ConsumerMessage
@@ -123,17 +124,18 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
 }
 
 func newInternalConsumer(client *client, options ConsumerOptions, topic string,
-       messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {
+       messageCh chan ConsumerMessage, dlq *dlqRouter, 
disableForceTopicCreation bool) (*consumer, error) {
 
        consumer := &consumer{
-               topic:     topic,
-               client:    client,
-               options:   options,
-               messageCh: messageCh,
-               closeCh:   make(chan struct{}),
-               errorCh:   make(chan error),
-               dlq:       dlq,
-               log:       log.WithField("topic", topic),
+               topic:                     topic,
+               client:                    client,
+               options:                   options,
+               disableForceTopicCreation: disableForceTopicCreation,
+               messageCh:                 messageCh,
+               closeCh:                   make(chan struct{}),
+               errorCh:                   make(chan error),
+               dlq:                       dlq,
+               log:                       log.WithField("topic", topic),
        }
 
        if options.Name != "" {
@@ -275,7 +277,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
        messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) 
{
-       return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
+       return newInternalConsumer(client, options, topic, messageCh, 
dlqRouter, false)
 }
 
 func (c *consumer) Subscription() string {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 33fc605..498f2ae 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -76,6 +76,7 @@ type partitionConsumerOpts struct {
        startMessageIDInclusive    bool
        subscriptionMode           subscriptionMode
        readCompacted              bool
+       disableForceTopicCreation  bool
 }
 
 type partitionConsumer struct {
@@ -748,6 +749,12 @@ func (pc *partitionConsumer) grabConn() error {
                cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
        }
 
+       // force topic creation is enabled by default so
+       // we only need to set the flag when disabling it
+       if pc.options.disableForceTopicCreation {
+               cmdSubscribe.ForceTopicCreation = proto.Bool(false)
+       }
+
        res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
                pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
 
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 00cbb88..3d0aebe 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts 
ConsumerOptions, ch chan Consum
        for _, t := range topics {
                go func(topic string) {
                        defer wg.Done()
-                       c, err := newInternalConsumer(c, opts, topic, ch, dlq)
+                       c, err := newInternalConsumer(c, opts, topic, ch, dlq, 
true)
                        consumerErrorCh <- consumerError{
                                err:      err,
                                topic:    topic,

Reply via email to