This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f3ffe25c [ISSUE #989] consumer and producer client add unit support 
(WithUnitName)
7f3ffe25c is described below

commit 7f3ffe25c4c17b21afd08c7b5a88ef61654c2084
Author: WeizhongTu <[email protected]>
AuthorDate: Mon Apr 10 11:23:05 2023 +0800

    [ISSUE #989] consumer and producer client add unit support (WithUnitName)
    
    [ISSUE #989] consumer and producer client add unit support (WithUnitName)
---
 consumer/option.go           | 19 +++++++++++--
 consumer/option_test.go      | 63 ++++++++++++++++++++++++++++++++++++++++++++
 primitive/nsresolver.go      | 18 ++++++++++---
 primitive/nsresolver_test.go | 42 +++++++++++++++++++++++++++--
 producer/option.go           | 17 +++++++++++-
 producer/option_test.go      | 63 ++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 213 insertions(+), 9 deletions(-)

diff --git a/consumer/option.go b/consumer/option.go
index f2cb9e8..3f7e6ca 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -18,9 +18,10 @@ limitations under the License.
 package consumer
 
 import (
-       "github.com/apache/rocketmq-client-go/v2/hooks"
+       "strings"
        "time"
 
+       "github.com/apache/rocketmq-client-go/v2/hooks"
        "github.com/apache/rocketmq-client-go/v2/internal"
        "github.com/apache/rocketmq-client-go/v2/primitive"
 )
@@ -327,7 +328,21 @@ func WithNameServer(nameServers primitive.NamesrvAddr) 
Option {
 // WithNameServerDomain set NameServer domain
 func WithNameServerDomain(nameServerUrl string) Option {
        return func(opts *consumerOptions) {
-               opts.Resolver = primitive.NewHttpResolver("DEFAULT", 
nameServerUrl)
+               h := primitive.NewHttpResolver("DEFAULT", nameServerUrl)
+               if opts.UnitName != "" {
+                       h.DomainWithUnit(opts.UnitName)
+               }
+               opts.Resolver = h
+       }
+}
+
+// WithUnitName set the name of specified unit
+func WithUnitName(unitName string) Option {
+       return func(opts *consumerOptions) {
+               opts.UnitName = strings.TrimSpace(unitName)
+               if ns, ok := opts.Resolver.(*primitive.HttpResolver); ok {
+                       ns.DomainWithUnit(opts.UnitName)
+               }
        }
 }
 
diff --git a/consumer/option_test.go b/consumer/option_test.go
new file mode 100644
index 0000000..a1c016e
--- /dev/null
+++ b/consumer/option_test.go
@@ -0,0 +1,63 @@
+package consumer
+
+import (
+       "fmt"
+       "reflect"
+       "strings"
+       "testing"
+)
+
+func getFieldString(obj interface{}, field string) string {
+       v := reflect.Indirect(reflect.ValueOf(obj))
+       return v.FieldByNameFunc(func(n string) bool {
+               return n == field
+       }).String()
+}
+
+func TestWithUnitName(t *testing.T) {
+       opt := defaultPullConsumerOptions()
+       unitName := "unsh"
+       WithUnitName(unitName)(&opt)
+       if opt.UnitName != unitName {
+               t.Errorf("consumer option WithUnitName. want:%s, got=%s", 
unitName, opt.UnitName)
+       }
+}
+
+func TestWithNameServerDomain(t *testing.T) {
+       opt := defaultPullConsumerOptions()
+       nameServerAddr := "http://127.0.0.1:8080/nameserver/addr";
+       WithNameServerDomain(nameServerAddr)(&opt)
+       domainStr := getFieldString(opt.Resolver, "domain")
+       if domainStr != nameServerAddr {
+               t.Errorf("consumer option WithUnitName. want:%s, got=%s", 
nameServerAddr, domainStr)
+       }
+}
+
+func TestWithNameServerDomainAndUnitName(t *testing.T) {
+       nameServerAddr := "http://127.0.0.1:8080/nameserver/addr";
+       unitName := "unsh"
+       suffix := fmt.Sprintf("-%s?nofix=1", unitName)
+
+       // test with two different orders
+       t.Run("WithNameServerDomain & WithUnitName", func(t *testing.T) {
+               opt := defaultPullConsumerOptions()
+               WithNameServerDomain(nameServerAddr)(&opt)
+               WithUnitName(unitName)(&opt)
+
+               domainStr := getFieldString(opt.Resolver, "domain")
+               if !strings.Contains(domainStr, nameServerAddr) || 
!strings.Contains(domainStr, suffix) {
+                       t.Errorf("consumer option should contains %s and %s", 
nameServerAddr, suffix)
+               }
+       })
+
+       t.Run("WithUnitName & WithNameServerDomain", func(t *testing.T) {
+               opt := defaultPullConsumerOptions()
+               WithNameServerDomain(nameServerAddr)(&opt)
+               WithUnitName(unitName)(&opt)
+
+               domainStr := getFieldString(opt.Resolver, "domain")
+               if !strings.Contains(domainStr, nameServerAddr) || 
!strings.Contains(domainStr, suffix) {
+                       t.Errorf("consumer option should contains %s and %s", 
nameServerAddr, suffix)
+               }
+       })
+}
diff --git a/primitive/nsresolver.go b/primitive/nsresolver.go
index 4e5917c..970242f 100644
--- a/primitive/nsresolver.go
+++ b/primitive/nsresolver.go
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, 
Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+       http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -111,6 +111,16 @@ func NewHttpResolver(instance string, domain ...string) 
*HttpResolver {
        return h
 }
 
+func (h *HttpResolver) DomainWithUnit(unitName string) {
+       if unitName == "" {
+               return
+       }
+       if strings.Contains(h.domain, "?nofix=1") {
+               return
+       }
+       h.domain = fmt.Sprintf("%s-%s?nofix=1", h.domain, unitName)
+}
+
 func (h *HttpResolver) Resolve() []string {
        addrs := h.get()
        if len(addrs) > 0 {
@@ -152,14 +162,14 @@ func (h *HttpResolver) get() []string {
                return nil
        }
 
-       bodyStr := string(body)
+       bodyStr := strings.TrimSpace(string(body))
        if bodyStr == "" {
                return nil
        }
 
-       h.saveSnapshot(body)
+       _ = h.saveSnapshot([]byte(bodyStr))
 
-       return strings.Split(string(body), ";")
+       return strings.Split(bodyStr, ";")
 }
 
 func (h *HttpResolver) saveSnapshot(body []byte) error {
diff --git a/primitive/nsresolver_test.go b/primitive/nsresolver_test.go
index d42d2c6..94d80db 100644
--- a/primitive/nsresolver_test.go
+++ b/primitive/nsresolver_test.go
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, 
Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
 
-    http://www.apache.org/licenses/LICENSE-2.0
+       http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,6 @@ package primitive
 
 import (
        "fmt"
-       "github.com/apache/rocketmq-client-go/v2/rlog"
        "io/ioutil"
        "net"
        "net/http"
@@ -26,6 +25,8 @@ import (
        "strings"
        "testing"
 
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+
        . "github.com/smartystreets/goconvey/convey"
 )
 
@@ -81,6 +82,43 @@ func TestHttpResolverWithGet(t *testing.T) {
        })
 }
 
+func TestHttpResolverWithGetUnitName(t *testing.T) {
+       Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
+               srvs := []string{
+                       "192.168.100.1",
+                       "192.168.100.2",
+                       "192.168.100.3",
+                       "192.168.100.4",
+                       "192.168.100.5",
+               }
+               http.HandleFunc("/nameserver/addrs3-unsh", func(w 
http.ResponseWriter, r *http.Request) {
+                       if r.URL.Query().Get("nofix") == "1" {
+                               fmt.Fprintf(w, strings.Join(srvs, ";"))
+                       }
+                       fmt.Fprintf(w, "")
+               })
+               server := &http.Server{Addr: ":0", Handler: nil}
+               listener, _ := net.Listen("tcp", ":0")
+               go server.Serve(listener)
+
+               port := listener.Addr().(*net.TCPAddr).Port
+               nameServerDommain := 
fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs3";, port)
+               rlog.Info("Temporary Nameserver", map[string]interface{}{
+                       "domain": nameServerDommain,
+               })
+
+               resolver := NewHttpResolver("DEFAULT", nameServerDommain)
+               resolver.DomainWithUnit("unsh")
+               resolver.Resolve()
+
+               // check snapshot saved
+               filePath := resolver.getSnapshotFilePath("DEFAULT")
+               body := strings.Join(srvs, ";")
+               bs, _ := ioutil.ReadFile(filePath)
+               So(string(bs), ShouldEqual, body)
+       })
+}
+
 func TestHttpResolverWithSnapshotFile(t *testing.T) {
        Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
                srvs := []string{
diff --git a/producer/option.go b/producer/option.go
index 9fd8374..c3a0dc4 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -18,6 +18,7 @@ limitations under the License.
 package producer
 
 import (
+       "strings"
        "time"
 
        "github.com/apache/rocketmq-client-go/v2/internal"
@@ -144,7 +145,21 @@ func WithNameServer(nameServers primitive.NamesrvAddr) 
Option {
 // WithNameServerDomain set NameServer domain
 func WithNameServerDomain(nameServerUrl string) Option {
        return func(opts *producerOptions) {
-               opts.Resolver = primitive.NewHttpResolver("DEFAULT", 
nameServerUrl)
+               h := primitive.NewHttpResolver("DEFAULT", nameServerUrl)
+               if opts.UnitName != "" {
+                       h.DomainWithUnit(opts.UnitName)
+               }
+               opts.Resolver = h
+       }
+}
+
+// WithUnitName set the name of specified unit
+func WithUnitName(unitName string) Option {
+       return func(opts *producerOptions) {
+               opts.UnitName = strings.TrimSpace(unitName)
+               if ns, ok := opts.Resolver.(*primitive.HttpResolver); ok {
+                       ns.DomainWithUnit(opts.UnitName)
+               }
        }
 }
 
diff --git a/producer/option_test.go b/producer/option_test.go
new file mode 100644
index 0000000..c074510
--- /dev/null
+++ b/producer/option_test.go
@@ -0,0 +1,63 @@
+package producer
+
+import (
+       "fmt"
+       "reflect"
+       "strings"
+       "testing"
+)
+
+func getFieldString(obj interface{}, field string) string {
+       v := reflect.Indirect(reflect.ValueOf(obj))
+       return v.FieldByNameFunc(func(n string) bool {
+               return n == field
+       }).String()
+}
+
+func TestWithUnitName(t *testing.T) {
+       opt := defaultProducerOptions()
+       unitName := "unsh"
+       WithUnitName(unitName)(&opt)
+       if opt.UnitName != unitName {
+               t.Errorf("consumer option WithUnitName. want:%s, got=%s", 
unitName, opt.UnitName)
+       }
+}
+
+func TestWithNameServerDomain(t *testing.T) {
+       opt := defaultProducerOptions()
+       nameServerAddr := "http://127.0.0.1:8080/nameserver/addr";
+       WithNameServerDomain(nameServerAddr)(&opt)
+       domainStr := getFieldString(opt.Resolver, "domain")
+       if domainStr != nameServerAddr {
+               t.Errorf("consumer option WithUnitName. want:%s, got=%s", 
nameServerAddr, domainStr)
+       }
+}
+
+func TestWithNameServerDomainAndUnitName(t *testing.T) {
+       nameServerAddr := "http://127.0.0.1:8080/nameserver/addr";
+       unitName := "unsh"
+       suffix := fmt.Sprintf("-%s?nofix=1", unitName)
+
+       // test with two different orders
+       t.Run("WithNameServerDomain & WithUnitName", func(t *testing.T) {
+               opt := defaultProducerOptions()
+               WithNameServerDomain(nameServerAddr)(&opt)
+               WithUnitName(unitName)(&opt)
+
+               domainStr := getFieldString(opt.Resolver, "domain")
+               if !strings.Contains(domainStr, nameServerAddr) || 
!strings.Contains(domainStr, suffix) {
+                       t.Errorf("consumer option should contains %s and %s", 
nameServerAddr, suffix)
+               }
+       })
+
+       t.Run("WithUnitName & WithNameServerDomain", func(t *testing.T) {
+               opt := defaultProducerOptions()
+               WithNameServerDomain(nameServerAddr)(&opt)
+               WithUnitName(unitName)(&opt)
+
+               domainStr := getFieldString(opt.Resolver, "domain")
+               if !strings.Contains(domainStr, nameServerAddr) || 
!strings.Contains(domainStr, suffix) {
+                       t.Errorf("consumer option should contains %s and %s", 
nameServerAddr, suffix)
+               }
+       })
+}

Reply via email to