This is an automated email from the ASF dual-hosted git repository.

finalt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new fcf39872b Fix:Resolve service disconnection and configuration 
invalidations (#2717)
fcf39872b is described below

commit fcf39872b14818af6b530ad96653a426923ac898
Author: finalt <[email protected]>
AuthorDate: Fri Jul 26 17:30:35 2024 +0800

    Fix:Resolve service disconnection and configuration invalidations (#2717)
    
    * Fix:Resolve service disconnection and configuration invalidations
    
    * Fix:Resolve config_center part of the configuration does not work
    
    * fix golang-lint
    
    * fix golang-lint
    
    * remove script router
---
 client/client.go                                   | 15 +++++
 cluster/cluster/failover/cluster_invoker.go        | 13 ++++-
 cluster/router/script/factory.go                   |  5 +-
 compat.go                                          | 54 +++++++++++++++++
 dubbo.go                                           |  2 +-
 global/reference_config.go                         | 68 ++++++++++++++++++++++
 protocol/triple/internal/server/cmd_server/main.go |  1 -
 protocol/triple/triple_invoker.go                  |  4 ++
 protocol/triple/triple_protocol/client.go          | 19 +++++-
 protocol/triple/triple_protocol/handler.go         |  7 ++-
 10 files changed, 177 insertions(+), 11 deletions(-)

diff --git a/client/client.go b/client/client.go
index a2bcaef1e..478fed175 100644
--- a/client/client.go
+++ b/client/client.go
@@ -23,6 +23,7 @@ import (
 )
 
 import (
+       "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -102,6 +103,20 @@ func (cli *Client) DialWithInfo(interfaceName string, info 
*ClientInfo, opts ...
        return cli.dial(interfaceName, info, opts...)
 }
 
+func (cli *Client) DialWithDefinition(interfaceName string, definition 
*ClientDefinition, opts ...ReferenceOption) (*Connection, error) {
+       // TODO(finalt) Temporarily solve the config_center configuration does 
not work
+       refName := common.GetReference(definition.Svc)
+       if refConfig, ok := cli.cliOpts.Consumer.References[refName]; ok {
+               ref := cli.cliOpts.overallReference.Clone()
+               for _, opt := range refConfig.GetOptions() {
+                       opt(ref)
+               }
+               opts = append(opts, setReference(ref))
+       }
+
+       return cli.dial(interfaceName, definition.Info, opts...)
+}
+
 func (cli *Client) dial(interfaceName string, info *ClientInfo, opts 
...ReferenceOption) (*Connection, error) {
        newRefOpts := defaultReferenceOptions()
        finalOpts := []ReferenceOption{
diff --git a/cluster/cluster/failover/cluster_invoker.go 
b/cluster/cluster/failover/cluster_invoker.go
index d70a3994d..e8a4db88f 100644
--- a/cluster/cluster/failover/cluster_invoker.go
+++ b/cluster/cluster/failover/cluster_invoker.go
@@ -19,8 +19,8 @@ package failover
 
 import (
        "context"
-       "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
        "fmt"
+       "strconv"
 )
 
 import (
@@ -35,6 +35,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/protocol"
+       "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
 )
 
 type failoverClusterInvoker struct {
@@ -61,7 +62,7 @@ func (invoker *failoverClusterInvoker) Invoke(ctx 
context.Context, invocation pr
        }
 
        methodName := invocation.ActualMethodName()
-       retries := getRetries(invokers, methodName)
+       retries := getRetries(invokers, methodName, invocation)
        loadBalance := base.GetLoadBalance(invokers[0], methodName)
 
        for i := 0; i <= retries; i++ {
@@ -113,7 +114,13 @@ func isBizError(err error) bool {
        return triple_protocol.IsWireError(err) && triple_protocol.CodeOf(err) 
== triple_protocol.CodeBizError
 }
 
-func getRetries(invokers []protocol.Invoker, methodName string) int {
+func getRetries(invokers []protocol.Invoker, methodName string, invocation 
protocol.Invocation) int {
+       // Todo(finalt) Temporarily solve the problem that the retries is not 
valid
+       if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
+               if rInt, err := strconv.Atoi(retries); err == nil {
+                       return rInt
+               }
+       }
        if len(invokers) <= 0 {
                return constant.DefaultRetriesInt
        }
diff --git a/cluster/router/script/factory.go b/cluster/router/script/factory.go
index a6869a36e..4d3ae5dd7 100644
--- a/cluster/router/script/factory.go
+++ b/cluster/router/script/factory.go
@@ -19,8 +19,6 @@ package script
 
 import (
        "dubbo.apache.org/dubbo-go/v3/cluster/router"
-       "dubbo.apache.org/dubbo-go/v3/common/constant"
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
 )
 
 func init() {
@@ -29,7 +27,8 @@ func init() {
                and cause warning if config center is empty.
                User can import this package and config config center to use 
Script router.
        */
-       extension.SetRouterFactory(constant.ScriptRouterFactoryKey, 
NewScriptRouterFactory)
+       // TODO(finalt) Temporarily removed until fixed 
(https://github.com/apache/dubbo-go/pull/2716)
+       //extension.SetRouterFactory(constant.ScriptRouterFactoryKey, 
NewScriptRouterFactory)
 }
 
 // ScriptRouteFactory router factory
diff --git a/compat.go b/compat.go
index 7e1ec6408..3bede0b35 100644
--- a/compat.go
+++ b/compat.go
@@ -644,6 +644,7 @@ func compatGlobalConsumerConfig(c *config.ConsumerConfig) 
*global.ConsumerConfig
                ProxyFactory:                   c.ProxyFactory,
                Check:                          c.Check,
                AdaptiveService:                c.AdaptiveService,
+               References:                     
compatGlobalReferences(c.References),
                TracingKey:                     c.TracingKey,
                FilterConf:                     c.FilterConf,
                MaxWaitTimeForServiceDiscovery: 
c.MaxWaitTimeForServiceDiscovery,
@@ -651,6 +652,59 @@ func compatGlobalConsumerConfig(c *config.ConsumerConfig) 
*global.ConsumerConfig
        }
 }
 
+func compatGlobalReferences(c map[string]*config.ReferenceConfig) 
map[string]*global.ReferenceConfig {
+       refs := make(map[string]*global.ReferenceConfig, len(c))
+       for name, ref := range c {
+               refs[name] = &global.ReferenceConfig{
+                       InterfaceName:    ref.InterfaceName,
+                       Check:            ref.Check,
+                       URL:              ref.URL,
+                       Filter:           ref.Filter,
+                       Protocol:         ref.Protocol,
+                       RegistryIDs:      ref.RegistryIDs,
+                       Cluster:          ref.Cluster,
+                       Loadbalance:      ref.Loadbalance,
+                       Retries:          ref.Retries,
+                       Group:            ref.Group,
+                       Version:          ref.Version,
+                       Serialization:    ref.Serialization,
+                       ProvidedBy:       ref.ProvidedBy,
+                       Methods:          compatGlobalMethod(ref.Methods),
+                       Async:            ref.Async,
+                       Params:           ref.Params,
+                       Generic:          ref.Generic,
+                       Sticky:           ref.Sticky,
+                       RequestTimeout:   ref.RequestTimeout,
+                       ForceTag:         ref.ForceTag,
+                       TracingKey:       ref.TracingKey,
+                       MeshProviderPort: ref.MeshProviderPort,
+               }
+       }
+       return refs
+}
+
+func compatGlobalMethod(m []*config.MethodConfig) []*global.MethodConfig {
+       methods := make([]*global.MethodConfig, 0, len(m))
+       for _, method := range m {
+               methods = append(methods, &global.MethodConfig{
+                       InterfaceId:                 method.InterfaceId,
+                       InterfaceName:               method.InterfaceName,
+                       Name:                        method.Name,
+                       Retries:                     method.Retries,
+                       LoadBalance:                 method.LoadBalance,
+                       Weight:                      method.Weight,
+                       TpsLimitInterval:            method.TpsLimitInterval,
+                       TpsLimitRate:                method.TpsLimitRate,
+                       TpsLimitStrategy:            method.TpsLimitStrategy,
+                       ExecuteLimit:                method.ExecuteLimit,
+                       ExecuteLimitRejectedHandler: 
method.ExecuteLimitRejectedHandler,
+                       Sticky:                      method.Sticky,
+                       RequestTimeout:              method.RequestTimeout,
+               })
+       }
+       return methods
+}
+
 func compatGlobalMetricConfig(c *config.MetricsConfig) *global.MetricsConfig {
        if c == nil {
                return nil
diff --git a/dubbo.go b/dubbo.go
index 3c9bc211e..ac9b77fdf 100644
--- a/dubbo.go
+++ b/dubbo.go
@@ -220,7 +220,7 @@ func (ins *Instance) loadConsumer() error {
        conLock.RLock()
        defer conLock.RUnlock()
        for intfName, definition := range consumerServices {
-               conn, dialErr := cli.DialWithInfo(intfName, definition.Info)
+               conn, dialErr := cli.DialWithDefinition(intfName, definition)
                if dialErr != nil {
                        return dialErr
                }
diff --git a/global/reference_config.go b/global/reference_config.go
index 1c24b61dc..94b188cb9 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -56,6 +56,74 @@ func DefaultReferenceConfig() *ReferenceConfig {
        }
 }
 
+func (c *ReferenceConfig) GetOptions() []ReferenceOption {
+       var refOpts []ReferenceOption
+       if c.InterfaceName != "" {
+               refOpts = append(refOpts, 
WithReference_InterfaceName(c.InterfaceName))
+       }
+       if c.Check != nil {
+               refOpts = append(refOpts, WithReference_Check(*c.Check))
+       }
+       if c.URL != "" {
+               refOpts = append(refOpts, WithReference_URL(c.URL))
+       }
+       if c.Filter != "" {
+               refOpts = append(refOpts, WithReference_Filter(c.Filter))
+       }
+       if c.Protocol != "" {
+               refOpts = append(refOpts, WithReference_Protocol(c.Protocol))
+       }
+       if c.RegistryIDs != nil && len(c.RegistryIDs) > 0 {
+               refOpts = append(refOpts, 
WithReference_RegistryIDs(c.RegistryIDs))
+       }
+       if c.Cluster != "" {
+               refOpts = append(refOpts, WithReference_Cluster(c.Cluster))
+       }
+       if c.Loadbalance != "" {
+               refOpts = append(refOpts, 
WithReference_LoadBalance(c.Loadbalance))
+       }
+       if c.Retries != "" {
+               if rInt, err := strconv.Atoi(c.Retries); err == nil {
+                       refOpts = append(refOpts, WithReference_Retries(rInt))
+               }
+       }
+       if c.Group != "" {
+               refOpts = append(refOpts, WithReference_Group(c.Group))
+       }
+       if c.Version != "" {
+               refOpts = append(refOpts, WithReference_Version(c.Version))
+       }
+       if c.Serialization != "" {
+               refOpts = append(refOpts, 
WithReference_Serialization(c.Serialization))
+       }
+       if c.ProvidedBy != "" {
+               refOpts = append(refOpts, 
WithReference_ProviderBy(c.ProvidedBy))
+       }
+       if c.Params != nil && len(c.Params) > 0 {
+               newParams := make(map[string]string, len(c.Params))
+               for k, v := range c.Params {
+                       newParams[k] = v
+               }
+               refOpts = append(refOpts, WithReference_Params(newParams))
+       }
+       if c.Generic != "" {
+               refOpts = append(refOpts, WithReference_Generic(c.Generic))
+       }
+       if c.Sticky {
+               refOpts = append(refOpts, WithReference_Sticky(c.Sticky))
+       }
+       if c.RequestTimeout != "" {
+               refOpts = append(refOpts, 
WithReference_RequestTimeout(c.RequestTimeout))
+       }
+       if c.TracingKey != "" {
+               refOpts = append(refOpts, 
WithReference_TracingKey(c.TracingKey))
+       }
+       if c.MeshProviderPort != 0 {
+               refOpts = append(refOpts, 
WithReference_MeshProviderPort(c.MeshProviderPort))
+       }
+       return refOpts
+}
+
 // Clone a new ReferenceConfig
 func (c *ReferenceConfig) Clone() *ReferenceConfig {
        if c == nil {
diff --git a/protocol/triple/internal/server/cmd_server/main.go 
b/protocol/triple/internal/server/cmd_server/main.go
index dc5fc0de8..a0d7c4dc2 100644
--- a/protocol/triple/internal/server/cmd_server/main.go
+++ b/protocol/triple/internal/server/cmd_server/main.go
@@ -31,7 +31,6 @@ func main() {
                        protocol.WithTriple(),
                        protocol.WithPort(20000),
                ),
-               server.WithServerVersion("1.0.0"),
        )
 
        if err != nil {
diff --git a/protocol/triple/triple_invoker.go 
b/protocol/triple/triple_invoker.go
index 3afe7a654..6ce195bd3 100644
--- a/protocol/triple/triple_invoker.go
+++ b/protocol/triple/triple_invoker.go
@@ -143,6 +143,10 @@ func (ti *TripleInvoker) Invoke(ctx context.Context, 
invocation protocol.Invocat
 }
 
 func mergeAttachmentToOutgoing(ctx context.Context, inv protocol.Invocation) 
(context.Context, error) {
+       // Todo(finalt) Temporarily solve the problem that the timeout time is 
not valid
+       if timeout, ok := inv.GetAttachment(constant.TimeoutKey); ok {
+               ctx = context.WithValue(ctx, "dubbo.timeout.key", timeout)
+       }
        for key, valRaw := range inv.Attachments() {
                if str, ok := valRaw.(string); ok {
                        ctx = tri.AppendToOutgoingContext(ctx, key, str)
diff --git a/protocol/triple/triple_protocol/client.go 
b/protocol/triple/triple_protocol/client.go
index 8ba67eec2..c0c1cb784 100644
--- a/protocol/triple/triple_protocol/client.go
+++ b/protocol/triple/triple_protocol/client.go
@@ -277,12 +277,27 @@ func parseRequestURL(rawURL string) (*url.URL, *Error) {
 func applyDefaultTimeout(ctx context.Context, timeout time.Duration) 
(context.Context, bool, context.CancelFunc) {
        var cancel context.CancelFunc
        var applyFlag bool
+
        _, ok := ctx.Deadline()
+
+       // Todo(finalt) Temporarily solve the problem that the timeout time is 
not valid
+       if !ok {
+               timeoutVal := ctx.Value("dubbo.timeout.key")
+               if timeoutVal != nil {
+                       if s, exist := timeoutVal.(string); exist && s != "" {
+                               if newTimeout, err := time.ParseDuration(s); 
err == nil {
+                                       ctx, cancel = context.WithDeadline(ctx, 
time.Now().Add(newTimeout))
+                                       applyFlag = true
+                                       return ctx, applyFlag, cancel
+                               }
+                       }
+               }
+       }
+
        if !ok && timeout != 0 {
-               ctx, cancel = context.WithTimeout(ctx, timeout)
+               ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
                applyFlag = true
        }
-
        return ctx, applyFlag, cancel
 }
 
diff --git a/protocol/triple/triple_protocol/handler.go 
b/protocol/triple/triple_protocol/handler.go
index 80234d1dc..320d503aa 100644
--- a/protocol/triple/triple_protocol/handler.go
+++ b/protocol/triple/triple_protocol/handler.go
@@ -16,6 +16,7 @@ package triple_protocol
 
 import (
        "context"
+       "errors"
        "fmt"
        "net/http"
 )
@@ -363,7 +364,11 @@ func (h *Handler) ServeHTTP(responseWriter 
http.ResponseWriter, request *http.Re
        svcGroup := request.Header.Get(tripleServiceGroup)
        svcVersion := request.Header.Get(tripleServiceVersion)
        // todo(DMwangnima): inspect ok
-       implementation := h.implementations[getIdentifier(svcGroup, svcVersion)]
+       implementation, ok := h.implementations[getIdentifier(svcGroup, 
svcVersion)]
+       if !ok {
+               _ = connCloser.Close(errors.New("no implementation for " + 
svcVersion))
+               return
+       }
        _ = connCloser.Close(implementation(ctx, connCloser))
 }
 

Reply via email to