This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 2431dec90 mesh router (#1917)
2431dec90 is described below
commit 2431dec904175eb2e3317a653eae48a9a4c33101
Author: binbin.zhang <[email protected]>
AuthorDate: Fri Jun 3 22:57:46 2022 +0800
mesh router (#1917)
---
cluster/router/meshrouter/factory.go | 41 +++++++++
cluster/router/meshrouter/meshrouter.go | 146 ++++++++++++++++++++++++++++++++
common/constant/key.go | 1 +
imports/imports.go | 1 +
4 files changed, 189 insertions(+)
diff --git a/cluster/router/meshrouter/factory.go
b/cluster/router/meshrouter/factory.go
new file mode 100644
index 000000000..773aa4b7b
--- /dev/null
+++ b/cluster/router/meshrouter/factory.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package meshrouter
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/router"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+)
+
+func init() {
+ extension.SetRouterFactory(constant.MeshRouterFactoryKey,
NewMeshRouterFactory)
+}
+
+// MeshRouterFactory is mesh router's factory
+type MeshRouterFactory struct{}
+
+// NewMeshRouterFactory constructs a new PriorityRouterFactory
+func NewMeshRouterFactory() router.PriorityRouterFactory {
+ return &MeshRouterFactory{}
+}
+
+// NewPriorityRouter construct a new UniformRouteFactory as PriorityRouter
+func (f *MeshRouterFactory) NewPriorityRouter() (router.PriorityRouter, error)
{
+ return NewMeshRouter()
+}
diff --git a/cluster/router/meshrouter/meshrouter.go
b/cluster/router/meshrouter/meshrouter.go
new file mode 100644
index 000000000..567f58a14
--- /dev/null
+++ b/cluster/router/meshrouter/meshrouter.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package meshrouter
+
+import (
+ "math/rand"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/router"
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/remoting/xds"
+)
+
+const (
+ name = "mesh-router"
+)
+
+// MeshRouter have
+type MeshRouter struct {
+ client *xds.WrappedClientImpl
+}
+
+// NewMeshRouter construct an NewConnCheckRouter via url
+func NewMeshRouter() (router.PriorityRouter, error) {
+ xdsWrappedClient := xds.GetXDSWrappedClient()
+ if xdsWrappedClient == nil {
+ logger.Debugf("[Mesh Router] xds wrapped client is not
created.")
+ }
+ return &MeshRouter{
+ client: xdsWrappedClient,
+ }, nil
+}
+
+// Route gets a list of routed invoker
+func (r *MeshRouter) Route(invokers []protocol.Invoker, url *common.URL,
invocation protocol.Invocation) []protocol.Invoker {
+ if r.client == nil {
+ return invokers
+ }
+ hostAddr, err :=
r.client.GetHostAddrByServiceUniqueKey(common.GetSubscribeName(url))
+ if err != nil {
+ // todo deal with error
+ return nil
+ }
+ rconf := r.client.GetRouterConfig(hostAddr)
+
+ clusterInvokerMap := make(map[string][]protocol.Invoker)
+ for _, v := range invokers {
+ meshClusterID := v.GetURL().GetParam(constant.MeshClusterIDKey,
"")
+ if _, ok := clusterInvokerMap[meshClusterID]; !ok {
+ clusterInvokerMap[meshClusterID] =
make([]protocol.Invoker, 0)
+ }
+ clusterInvokerMap[meshClusterID] =
append(clusterInvokerMap[meshClusterID], v)
+ }
+ route, err := r.client.MatchRoute(rconf, invocation)
+ if err != nil {
+ logger.Errorf("[Mesh Router] not found route,method=%s",
invocation.MethodName())
+ return nil
+ }
+
+ // Loop through routes in order and select first match.
+ if route == nil || route.WeightedClusters == nil {
+ logger.Errorf("[Mesh Router] route's WeightedClusters is empty,
route: %+v", r)
+ return invokers
+ }
+ invokersWeightPairs := make(invokerWeightPairs, 0)
+
+ for clusterID, weight := range route.WeightedClusters {
+ // cluster -> invokers
+ targetInvokers := clusterInvokerMap[clusterID]
+ invokersWeightPairs = append(invokersWeightPairs,
invokerWeightPair{
+ invokers: targetInvokers,
+ weight: weight.Weight,
+ })
+ }
+ return invokersWeightPairs.GetInvokers()
+}
+
+// Process there is no process needs for uniform Router, as it upper struct
RouterChain has done it
+func (r *MeshRouter) Process(event *config_center.ConfigChangeEvent) {
+}
+
+// Name get name of ConnCheckerRouter
+func (r *MeshRouter) Name() string {
+ return name
+}
+
+// Priority get Router priority level
+func (r *MeshRouter) Priority() int64 {
+ return 0
+}
+
+// URL Return URL in router
+func (r *MeshRouter) URL() *common.URL {
+ return nil
+}
+
+// Notify the router the invoker list
+func (r *MeshRouter) Notify(invokers []protocol.Invoker) {
+}
+
+type invokerWeightPair struct {
+ invokers []protocol.Invoker
+ weight uint32
+}
+
+type invokerWeightPairs []invokerWeightPair
+
+func (i *invokerWeightPairs) GetInvokers() []protocol.Invoker {
+ if len(*i) == 0 {
+ return nil
+ }
+ totalWeight := uint32(0)
+ tempWeight := uint32(0)
+ for _, v := range *i {
+ totalWeight += v.weight
+ }
+ randFloat := rand.Float64()
+ for _, v := range *i {
+ tempWeight += v.weight
+ tempPercent := float64(tempWeight) / float64(totalWeight)
+ if tempPercent >= randFloat {
+ return v.invokers
+ }
+ }
+ return (*i)[0].invokers
+}
diff --git a/common/constant/key.go b/common/constant/key.go
index 7cea915e7..51ca45375 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -305,6 +305,7 @@ const (
Tagkey = "dubbo.tag" // key of tag
AttachmentKey = DubboCtxKey("attachment") // key in context
in invoker
TagRouterFactoryKey = "tag"
+ MeshRouterFactoryKey = "mesh"
)
// Auth filter
diff --git a/imports/imports.go b/imports/imports.go
index 3ac1d74ab..fdb845878 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -32,6 +32,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/router/meshrouter"
_ "dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory"
_ "dubbo.apache.org/dubbo-go/v3/config_center/apollo"
_ "dubbo.apache.org/dubbo-go/v3/config_center/nacos"