This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 2c6f4bbca5f8fc4fa6c95f5208b87fabe1787034 Author: Zijie Lu <[email protected]> AuthorDate: Thu May 27 19:25:00 2021 +0800 Address review comments Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/selector/ip_selector.go | 24 +++++++++++++++------ .../tubemq-client-go/selector/ip_selector_test.go | 25 +++++++++++++++------- .../tubemq-client-go/selector/selector.go | 13 ++++++++--- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go index a3547db..177f3ee 100644 --- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go +++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go @@ -24,13 +24,18 @@ import ( func init() { s := &ipSelector{} - s.indexes = make(map[string]int) + s.services = make(map[string]*ipServices) Register("ip", s) Register("dns", s) } type ipSelector struct { - indexes map[string]int + services map[string]*ipServices +} + +type ipServices struct { + nextIndex int + addresses []string } // Select implements Selector interface. @@ -50,15 +55,22 @@ func (s *ipSelector) Select(serviceName string) (*Node, error) { }, nil } + var addresses []string nextIndex := 0 - if index, ok := s.indexes[serviceName]; ok { - nextIndex = index + if _, ok := s.services[serviceName]; !ok { + addresses = strings.Split(serviceName, ",") + } else { + services := s.services[serviceName] + addresses = services.addresses + nextIndex = services.nextIndex } - addresses := strings.Split(serviceName, ",") address := addresses[nextIndex] nextIndex = (nextIndex + 1) % num - s.indexes[serviceName] = nextIndex + s.services[serviceName] = &ipServices{ + addresses: addresses, + nextIndex: nextIndex, + } node := &Node{ ServiceName: serviceName, diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go index 6b5ba35..8233006 100644 --- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go +++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go @@ -25,7 +25,8 @@ import ( func TestSingleIP(t *testing.T) { serviceName := "192.168.0.1:9092" - selector := Get("ip") + selector, err := Get("ip") + assert.Nil(t, err) node, err := selector.Select(serviceName) assert.Nil(t, err) assert.Equal(t, node.HasNext, false) @@ -35,7 +36,8 @@ func TestSingleIP(t *testing.T) { func TestSingleDNS(t *testing.T) { serviceName := "tubemq:8081" - selector := Get("dns") + selector, err := Get("dns") + assert.Nil(t, err) node, err := selector.Select(serviceName) assert.Nil(t, err) assert.Equal(t, node.HasNext, false) @@ -45,7 +47,8 @@ func TestSingleDNS(t *testing.T) { func TestMultipleIP(t *testing.T) { serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094" - selector := Get("dns") + selector, err := Get("dns") + assert.Nil(t, err) node, err := selector.Select(serviceName) assert.Nil(t, err) assert.Equal(t, true, node.HasNext) @@ -70,7 +73,8 @@ func TestMultipleIP(t *testing.T) { func TestMultipleDNS(t *testing.T) { serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084" - selector := Get("dns") + selector, err := Get("dns") + assert.Nil(t, err) node, err := selector.Select(serviceName) assert.Nil(t, err) assert.Equal(t, true, node.HasNext) @@ -93,10 +97,15 @@ func TestMultipleDNS(t *testing.T) { assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName) } - func TestEmptyService(t *testing.T) { serviceName := "" - selector := Get("ip") - _, err := selector.Select(serviceName) + selector, err := Get("ip") + assert.Nil(t, err) + _, err = selector.Select(serviceName) assert.Error(t, err) -} \ No newline at end of file +} + +func TestInvalidSelector(t *testing.T) { + _, err := Get("selector") + assert.Error(t, err) +} diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go index 5b5e0c8..77352fd 100644 --- a/tubemq-client-twins/tubemq-client-go/selector/selector.go +++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go @@ -18,6 +18,11 @@ // package selector defines the route selector which is responsible for service discovery. package selector +import ( + "errors" + "fmt" +) + // Selector is abstraction of route selector which can return an available address // from the service name. type Selector interface { @@ -35,9 +40,11 @@ func Register(name string, s Selector) { } // Get returns the corresponding selector. -func Get(name string) Selector { - s := selectors[name] - return s +func Get(name string) (Selector, error) { + if _, ok := selectors[name]; !ok { + return nil, errors.New(fmt.Sprintf("selector %s is invalid", name)) + } + return selectors[name], nil } // Node represents the service node.
