This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 395072c337a438339bf1209228bd23e5892d394f Author: Zijie Lu <[email protected]> AuthorDate: Thu May 27 15:22:15 2021 +0800 [INLONG-620]Selector for Go SDK Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/selector/ip_selector.go | 71 ++++++++++++++ .../tubemq-client-go/selector/ip_selector_test.go | 102 +++++++++++++++++++++ .../tubemq-client-go/selector/selector.go | 51 +++++++++++ 3 files changed, 224 insertions(+) diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go new file mode 100644 index 0000000..a3547db --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package selector + +import ( + "errors" + "strings" +) + +func init() { + s := &ipSelector{} + s.indexes = make(map[string]int) + Register("ip", s) + Register("dns", s) +} + +type ipSelector struct { + indexes map[string]int +} + +// Select implements Selector interface. +// Select will return the address in the serviceName sequentially. +// The first address will be returned after reaching the end of the addresses. +func (s *ipSelector) Select(serviceName string) (*Node, error) { + if len(serviceName) == 0 { + return nil, errors.New("serviceName empty") + } + + num := strings.Count(serviceName, ",") + 1 + if num == 1 { + return &Node{ + ServiceName: serviceName, + Address: serviceName, + HasNext: false, + }, nil + } + + nextIndex := 0 + if index, ok := s.indexes[serviceName]; ok { + nextIndex = index + } + + addresses := strings.Split(serviceName, ",") + address := addresses[nextIndex] + nextIndex = (nextIndex + 1) % num + s.indexes[serviceName] = nextIndex + + node := &Node{ + ServiceName: serviceName, + Address: address, + } + if nextIndex > 0 { + node.HasNext = true + } + return node, nil +} diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go new file mode 100644 index 0000000..6b5ba35 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package selector + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSingleIP(t *testing.T) { + serviceName := "192.168.0.1:9092" + selector := Get("ip") + node, err := selector.Select(serviceName) + assert.Nil(t, err) + assert.Equal(t, node.HasNext, false) + assert.Equal(t, node.Address, "192.168.0.1:9092") + assert.Equal(t, node.ServiceName, "192.168.0.1:9092") +} + +func TestSingleDNS(t *testing.T) { + serviceName := "tubemq:8081" + selector := Get("dns") + node, err := selector.Select(serviceName) + assert.Nil(t, err) + assert.Equal(t, node.HasNext, false) + assert.Equal(t, node.Address, "tubemq:8081") + assert.Equal(t, node.ServiceName, "tubemq:8081") +} + +func TestMultipleIP(t *testing.T) { + serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094" + selector := Get("dns") + node, err := selector.Select(serviceName) + assert.Nil(t, err) + assert.Equal(t, true, node.HasNext) + assert.Equal(t, "192.168.0.1:9091", node.Address) + assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName) + + node, err = selector.Select(serviceName) + assert.Equal(t, true, node.HasNext) + assert.Equal(t, "192.168.0.1:9092", node.Address) + assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName) + + node, err = selector.Select(serviceName) + assert.Equal(t, true, node.HasNext) + assert.Equal(t, "192.168.0.1:9093", node.Address) + assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName) + + node, err = selector.Select(serviceName) + assert.Equal(t, false, node.HasNext) + assert.Equal(t, "192.168.0.1:9094", node.Address) + assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName) +} + +func TestMultipleDNS(t *testing.T) { + serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084" + selector := Get("dns") + node, err := selector.Select(serviceName) + assert.Nil(t, err) + assert.Equal(t, true, node.HasNext) + assert.Equal(t, "tubemq:8081", node.Address) + assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName) + + node, err = selector.Select(serviceName) + assert.Equal(t, true, node.HasNext) + assert.Equal(t, "tubemq:8082", node.Address) + assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName) + + node, err = selector.Select(serviceName) + assert.Equal(t, true, node.HasNext) + assert.Equal(t, "tubemq:8083", node.Address) + assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName) + + node, err = selector.Select(serviceName) + assert.Equal(t, false, node.HasNext) + assert.Equal(t, "tubemq:8084", node.Address) + assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName) +} + + +func TestEmptyService(t *testing.T) { + serviceName := "" + selector := Get("ip") + _, err := selector.Select(serviceName) + assert.Error(t, err) +} \ No newline at end of file diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go new file mode 100644 index 0000000..5b5e0c8 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// package selector defines the route selector which is responsible for service discovery. +package selector + +// Selector is abstraction of route selector which can return an available address +// from the service name. +type Selector interface { + // Select will return a service node which contains an available address. + Select(serviceName string) (*Node, error) +} + +var ( + selectors = make(map[string]Selector) +) + +// Register registers a selector. +func Register(name string, s Selector) { + selectors[name] = s +} + +// Get returns the corresponding selector. +func Get(name string) Selector { + s := selectors[name] + return s +} + +// Node represents the service node. +type Node struct { + // ServiceName of the node. + ServiceName string + // Address of the node. + Address string + // HasNext indicates whether or not the service has next node. + HasNext bool +}
