This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ffced1  fix requestID using consumerID && loopclosure issue && add 
old style topic support (#128)
4ffced1 is described below

commit 4ffced1ae42a96553fd15f6cc295a22b2eb41674
Author: hailang.wei <[email protected]>
AuthorDate: Thu Dec 19 11:36:37 2019 +0800

    fix requestID using consumerID && loopclosure issue && add old style topic 
support (#128)
    
    * fix requestID using consumerID
    
    * fix loopclosure issue
    
    * add old style topic support
---
 pulsar/consumer_partition.go       |  2 +-
 pulsar/consumer_regex.go           |  2 +-
 pulsar/internal/topic_name.go      |  5 +++--
 pulsar/internal/topic_name_test.go | 14 ++++++++++----
 4 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a217d55..68ee7af 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -465,7 +465,7 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
        pc.state = consumerClosing
        pc.log.Infof("Closing consumer=%d", pc.consumerID)
 
-       requestID := pc.client.rpcClient.NewConsumerID()
+       requestID := pc.client.rpcClient.NewRequestID()
        cmdClose := &pb.CommandCloseConsumer{
                ConsumerId: proto.Uint64(pc.consumerID),
                RequestId:  proto.Uint64(requestID),
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index aa57356..e68c495 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -351,7 +351,7 @@ func subscriber(c *client, topics []string, opts 
ConsumerOptions, ch chan Consum
                        c, err := internalTopicSubscribe(c, opts, topic, ch)
                        consumerErrorCh <- consumerError{
                                err:      err,
-                               topic:    t,
+                               topic:    topic,
                                consumer: c,
                        }
                }(t)
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index f5b9890..78d2abb 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -44,9 +44,10 @@ func ParseTopicName(topic string) (*TopicName, error) {
        if !strings.Contains(topic, "://") {
                // The short topic name can be:
                // - <topic>
-               // - <property>/<namespace>/<topic>
+               // - <tenant>/<namespace>/<topic>
+               // - <tenant>/<cluster>/<namespace>/<topic>
                parts := strings.Split(topic, "/")
-               if len(parts) == 3 {
+               if len(parts) == 3 || len(parts) == 4 {
                        topic = "persistent://" + topic
                } else if len(parts) == 1 {
                        topic = "persistent://" + publicTenant + "/" + 
defaultNamespace + "/" + parts[0]
diff --git a/pulsar/internal/topic_name_test.go 
b/pulsar/internal/topic_name_test.go
index 738ab22..e57bf4d 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -61,6 +61,12 @@ func TestParseTopicName(t *testing.T) {
        assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", 
topic.Name)
        assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
        assert.Equal(t, -1, topic.Partition)
+
+       topic, err = ParseTopicName("my-tenant/my-cluster/my-ns/my-topic")
+       assert.Nil(t, err)
+       assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", 
topic.Name)
+       assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+       assert.Equal(t, -1, topic.Partition)
 }
 
 func TestParseTopicNameErrors(t *testing.T) {
@@ -84,16 +90,16 @@ func TestParseTopicNameErrors(t *testing.T) {
 }
 
 func TestTopicNameWithoutPartitionPart(t *testing.T) {
-       tests := []struct{
-               tn TopicName
+       tests := []struct {
+               tn       TopicName
                expected string
        }{
                {
-                       tn: 
TopicName{Name:"persistent://public/default/my-topic", Partition:-1},
+                       tn:       TopicName{Name: 
"persistent://public/default/my-topic", Partition: -1},
                        expected: "persistent://public/default/my-topic",
                },
                {
-                       tn: 
TopicName{Name:"persistent://public/default/my-topic-partition-0", Partition:0},
+                       tn:       TopicName{Name: 
"persistent://public/default/my-topic-partition-0", Partition: 0},
                        expected: "persistent://public/default/my-topic",
                },
        }

Reply via email to