This is an automated email from the ASF dual-hosted git repository.
finalt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new fcf39872b Fix:Resolve service disconnection and configuration
invalidations (#2717)
fcf39872b is described below
commit fcf39872b14818af6b530ad96653a426923ac898
Author: finalt <[email protected]>
AuthorDate: Fri Jul 26 17:30:35 2024 +0800
Fix:Resolve service disconnection and configuration invalidations (#2717)
* Fix:Resolve service disconnection and configuration invalidations
* Fix:Resolve config_center part of the configuration does not work
* fix golang-lint
* fix golang-lint
* remove script router
---
client/client.go | 15 +++++
cluster/cluster/failover/cluster_invoker.go | 13 ++++-
cluster/router/script/factory.go | 5 +-
compat.go | 54 +++++++++++++++++
dubbo.go | 2 +-
global/reference_config.go | 68 ++++++++++++++++++++++
protocol/triple/internal/server/cmd_server/main.go | 1 -
protocol/triple/triple_invoker.go | 4 ++
protocol/triple/triple_protocol/client.go | 19 +++++-
protocol/triple/triple_protocol/handler.go | 7 ++-
10 files changed, 177 insertions(+), 11 deletions(-)
diff --git a/client/client.go b/client/client.go
index a2bcaef1e..478fed175 100644
--- a/client/client.go
+++ b/client/client.go
@@ -23,6 +23,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -102,6 +103,20 @@ func (cli *Client) DialWithInfo(interfaceName string, info
*ClientInfo, opts ...
return cli.dial(interfaceName, info, opts...)
}
+func (cli *Client) DialWithDefinition(interfaceName string, definition
*ClientDefinition, opts ...ReferenceOption) (*Connection, error) {
+ // TODO(finalt) Temporarily solve the config_center configuration does
not work
+ refName := common.GetReference(definition.Svc)
+ if refConfig, ok := cli.cliOpts.Consumer.References[refName]; ok {
+ ref := cli.cliOpts.overallReference.Clone()
+ for _, opt := range refConfig.GetOptions() {
+ opt(ref)
+ }
+ opts = append(opts, setReference(ref))
+ }
+
+ return cli.dial(interfaceName, definition.Info, opts...)
+}
+
func (cli *Client) dial(interfaceName string, info *ClientInfo, opts
...ReferenceOption) (*Connection, error) {
newRefOpts := defaultReferenceOptions()
finalOpts := []ReferenceOption{
diff --git a/cluster/cluster/failover/cluster_invoker.go
b/cluster/cluster/failover/cluster_invoker.go
index d70a3994d..e8a4db88f 100644
--- a/cluster/cluster/failover/cluster_invoker.go
+++ b/cluster/cluster/failover/cluster_invoker.go
@@ -19,8 +19,8 @@ package failover
import (
"context"
- "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
"fmt"
+ "strconv"
)
import (
@@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
)
type failoverClusterInvoker struct {
@@ -61,7 +62,7 @@ func (invoker *failoverClusterInvoker) Invoke(ctx
context.Context, invocation pr
}
methodName := invocation.ActualMethodName()
- retries := getRetries(invokers, methodName)
+ retries := getRetries(invokers, methodName, invocation)
loadBalance := base.GetLoadBalance(invokers[0], methodName)
for i := 0; i <= retries; i++ {
@@ -113,7 +114,13 @@ func isBizError(err error) bool {
return triple_protocol.IsWireError(err) && triple_protocol.CodeOf(err)
== triple_protocol.CodeBizError
}
-func getRetries(invokers []protocol.Invoker, methodName string) int {
+func getRetries(invokers []protocol.Invoker, methodName string, invocation
protocol.Invocation) int {
+ // Todo(finalt) Temporarily solve the problem that the retries is not
valid
+ if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
+ if rInt, err := strconv.Atoi(retries); err == nil {
+ return rInt
+ }
+ }
if len(invokers) <= 0 {
return constant.DefaultRetriesInt
}
diff --git a/cluster/router/script/factory.go b/cluster/router/script/factory.go
index a6869a36e..4d3ae5dd7 100644
--- a/cluster/router/script/factory.go
+++ b/cluster/router/script/factory.go
@@ -19,8 +19,6 @@ package script
import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
)
func init() {
@@ -29,7 +27,8 @@ func init() {
and cause warning if config center is empty.
User can import this package and config config center to use
Script router.
*/
- extension.SetRouterFactory(constant.ScriptRouterFactoryKey,
NewScriptRouterFactory)
+ // TODO(finalt) Temporarily removed until fixed
(https://github.com/apache/dubbo-go/pull/2716)
+ //extension.SetRouterFactory(constant.ScriptRouterFactoryKey,
NewScriptRouterFactory)
}
// ScriptRouteFactory router factory
diff --git a/compat.go b/compat.go
index 7e1ec6408..3bede0b35 100644
--- a/compat.go
+++ b/compat.go
@@ -644,6 +644,7 @@ func compatGlobalConsumerConfig(c *config.ConsumerConfig)
*global.ConsumerConfig
ProxyFactory: c.ProxyFactory,
Check: c.Check,
AdaptiveService: c.AdaptiveService,
+ References:
compatGlobalReferences(c.References),
TracingKey: c.TracingKey,
FilterConf: c.FilterConf,
MaxWaitTimeForServiceDiscovery:
c.MaxWaitTimeForServiceDiscovery,
@@ -651,6 +652,59 @@ func compatGlobalConsumerConfig(c *config.ConsumerConfig)
*global.ConsumerConfig
}
}
+func compatGlobalReferences(c map[string]*config.ReferenceConfig)
map[string]*global.ReferenceConfig {
+ refs := make(map[string]*global.ReferenceConfig, len(c))
+ for name, ref := range c {
+ refs[name] = &global.ReferenceConfig{
+ InterfaceName: ref.InterfaceName,
+ Check: ref.Check,
+ URL: ref.URL,
+ Filter: ref.Filter,
+ Protocol: ref.Protocol,
+ RegistryIDs: ref.RegistryIDs,
+ Cluster: ref.Cluster,
+ Loadbalance: ref.Loadbalance,
+ Retries: ref.Retries,
+ Group: ref.Group,
+ Version: ref.Version,
+ Serialization: ref.Serialization,
+ ProvidedBy: ref.ProvidedBy,
+ Methods: compatGlobalMethod(ref.Methods),
+ Async: ref.Async,
+ Params: ref.Params,
+ Generic: ref.Generic,
+ Sticky: ref.Sticky,
+ RequestTimeout: ref.RequestTimeout,
+ ForceTag: ref.ForceTag,
+ TracingKey: ref.TracingKey,
+ MeshProviderPort: ref.MeshProviderPort,
+ }
+ }
+ return refs
+}
+
+func compatGlobalMethod(m []*config.MethodConfig) []*global.MethodConfig {
+ methods := make([]*global.MethodConfig, 0, len(m))
+ for _, method := range m {
+ methods = append(methods, &global.MethodConfig{
+ InterfaceId: method.InterfaceId,
+ InterfaceName: method.InterfaceName,
+ Name: method.Name,
+ Retries: method.Retries,
+ LoadBalance: method.LoadBalance,
+ Weight: method.Weight,
+ TpsLimitInterval: method.TpsLimitInterval,
+ TpsLimitRate: method.TpsLimitRate,
+ TpsLimitStrategy: method.TpsLimitStrategy,
+ ExecuteLimit: method.ExecuteLimit,
+ ExecuteLimitRejectedHandler:
method.ExecuteLimitRejectedHandler,
+ Sticky: method.Sticky,
+ RequestTimeout: method.RequestTimeout,
+ })
+ }
+ return methods
+}
+
func compatGlobalMetricConfig(c *config.MetricsConfig) *global.MetricsConfig {
if c == nil {
return nil
diff --git a/dubbo.go b/dubbo.go
index 3c9bc211e..ac9b77fdf 100644
--- a/dubbo.go
+++ b/dubbo.go
@@ -220,7 +220,7 @@ func (ins *Instance) loadConsumer() error {
conLock.RLock()
defer conLock.RUnlock()
for intfName, definition := range consumerServices {
- conn, dialErr := cli.DialWithInfo(intfName, definition.Info)
+ conn, dialErr := cli.DialWithDefinition(intfName, definition)
if dialErr != nil {
return dialErr
}
diff --git a/global/reference_config.go b/global/reference_config.go
index 1c24b61dc..94b188cb9 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -56,6 +56,74 @@ func DefaultReferenceConfig() *ReferenceConfig {
}
}
+func (c *ReferenceConfig) GetOptions() []ReferenceOption {
+ var refOpts []ReferenceOption
+ if c.InterfaceName != "" {
+ refOpts = append(refOpts,
WithReference_InterfaceName(c.InterfaceName))
+ }
+ if c.Check != nil {
+ refOpts = append(refOpts, WithReference_Check(*c.Check))
+ }
+ if c.URL != "" {
+ refOpts = append(refOpts, WithReference_URL(c.URL))
+ }
+ if c.Filter != "" {
+ refOpts = append(refOpts, WithReference_Filter(c.Filter))
+ }
+ if c.Protocol != "" {
+ refOpts = append(refOpts, WithReference_Protocol(c.Protocol))
+ }
+ if c.RegistryIDs != nil && len(c.RegistryIDs) > 0 {
+ refOpts = append(refOpts,
WithReference_RegistryIDs(c.RegistryIDs))
+ }
+ if c.Cluster != "" {
+ refOpts = append(refOpts, WithReference_Cluster(c.Cluster))
+ }
+ if c.Loadbalance != "" {
+ refOpts = append(refOpts,
WithReference_LoadBalance(c.Loadbalance))
+ }
+ if c.Retries != "" {
+ if rInt, err := strconv.Atoi(c.Retries); err == nil {
+ refOpts = append(refOpts, WithReference_Retries(rInt))
+ }
+ }
+ if c.Group != "" {
+ refOpts = append(refOpts, WithReference_Group(c.Group))
+ }
+ if c.Version != "" {
+ refOpts = append(refOpts, WithReference_Version(c.Version))
+ }
+ if c.Serialization != "" {
+ refOpts = append(refOpts,
WithReference_Serialization(c.Serialization))
+ }
+ if c.ProvidedBy != "" {
+ refOpts = append(refOpts,
WithReference_ProviderBy(c.ProvidedBy))
+ }
+ if c.Params != nil && len(c.Params) > 0 {
+ newParams := make(map[string]string, len(c.Params))
+ for k, v := range c.Params {
+ newParams[k] = v
+ }
+ refOpts = append(refOpts, WithReference_Params(newParams))
+ }
+ if c.Generic != "" {
+ refOpts = append(refOpts, WithReference_Generic(c.Generic))
+ }
+ if c.Sticky {
+ refOpts = append(refOpts, WithReference_Sticky(c.Sticky))
+ }
+ if c.RequestTimeout != "" {
+ refOpts = append(refOpts,
WithReference_RequestTimeout(c.RequestTimeout))
+ }
+ if c.TracingKey != "" {
+ refOpts = append(refOpts,
WithReference_TracingKey(c.TracingKey))
+ }
+ if c.MeshProviderPort != 0 {
+ refOpts = append(refOpts,
WithReference_MeshProviderPort(c.MeshProviderPort))
+ }
+ return refOpts
+}
+
// Clone a new ReferenceConfig
func (c *ReferenceConfig) Clone() *ReferenceConfig {
if c == nil {
diff --git a/protocol/triple/internal/server/cmd_server/main.go
b/protocol/triple/internal/server/cmd_server/main.go
index dc5fc0de8..a0d7c4dc2 100644
--- a/protocol/triple/internal/server/cmd_server/main.go
+++ b/protocol/triple/internal/server/cmd_server/main.go
@@ -31,7 +31,6 @@ func main() {
protocol.WithTriple(),
protocol.WithPort(20000),
),
- server.WithServerVersion("1.0.0"),
)
if err != nil {
diff --git a/protocol/triple/triple_invoker.go
b/protocol/triple/triple_invoker.go
index 3afe7a654..6ce195bd3 100644
--- a/protocol/triple/triple_invoker.go
+++ b/protocol/triple/triple_invoker.go
@@ -143,6 +143,10 @@ func (ti *TripleInvoker) Invoke(ctx context.Context,
invocation protocol.Invocat
}
func mergeAttachmentToOutgoing(ctx context.Context, inv protocol.Invocation)
(context.Context, error) {
+ // Todo(finalt) Temporarily solve the problem that the timeout time is
not valid
+ if timeout, ok := inv.GetAttachment(constant.TimeoutKey); ok {
+ ctx = context.WithValue(ctx, "dubbo.timeout.key", timeout)
+ }
for key, valRaw := range inv.Attachments() {
if str, ok := valRaw.(string); ok {
ctx = tri.AppendToOutgoingContext(ctx, key, str)
diff --git a/protocol/triple/triple_protocol/client.go
b/protocol/triple/triple_protocol/client.go
index 8ba67eec2..c0c1cb784 100644
--- a/protocol/triple/triple_protocol/client.go
+++ b/protocol/triple/triple_protocol/client.go
@@ -277,12 +277,27 @@ func parseRequestURL(rawURL string) (*url.URL, *Error) {
func applyDefaultTimeout(ctx context.Context, timeout time.Duration)
(context.Context, bool, context.CancelFunc) {
var cancel context.CancelFunc
var applyFlag bool
+
_, ok := ctx.Deadline()
+
+ // Todo(finalt) Temporarily solve the problem that the timeout time is
not valid
+ if !ok {
+ timeoutVal := ctx.Value("dubbo.timeout.key")
+ if timeoutVal != nil {
+ if s, exist := timeoutVal.(string); exist && s != "" {
+ if newTimeout, err := time.ParseDuration(s);
err == nil {
+ ctx, cancel = context.WithDeadline(ctx,
time.Now().Add(newTimeout))
+ applyFlag = true
+ return ctx, applyFlag, cancel
+ }
+ }
+ }
+ }
+
if !ok && timeout != 0 {
- ctx, cancel = context.WithTimeout(ctx, timeout)
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
applyFlag = true
}
-
return ctx, applyFlag, cancel
}
diff --git a/protocol/triple/triple_protocol/handler.go
b/protocol/triple/triple_protocol/handler.go
index 80234d1dc..320d503aa 100644
--- a/protocol/triple/triple_protocol/handler.go
+++ b/protocol/triple/triple_protocol/handler.go
@@ -16,6 +16,7 @@ package triple_protocol
import (
"context"
+ "errors"
"fmt"
"net/http"
)
@@ -363,7 +364,11 @@ func (h *Handler) ServeHTTP(responseWriter
http.ResponseWriter, request *http.Re
svcGroup := request.Header.Get(tripleServiceGroup)
svcVersion := request.Header.Get(tripleServiceVersion)
// todo(DMwangnima): inspect ok
- implementation := h.implementations[getIdentifier(svcGroup, svcVersion)]
+ implementation, ok := h.implementations[getIdentifier(svcGroup,
svcVersion)]
+ if !ok {
+ _ = connCloser.Close(errors.New("no implementation for " +
svcVersion))
+ return
+ }
_ = connCloser.Close(implementation(ctx, connCloser))
}