This is an automated email from the ASF dual-hosted git repository. laurence pushed a commit to branch hsf-go-dependency in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit 347fadf50abc6b57e12051b5774190428ab909b5 Author: LaurenceLiZhixin <[email protected]> AuthorDate: Mon Aug 30 11:58:03 2021 +0800 hsf-go-dependecy branch init --- common/constant/default.go | 2 +- common/proxy/proxy.go | 30 ++++--- common/proxy/proxy_factory/default.go | 8 +- common/rpc_service.go | 41 +++++++--- common/rpc_service_test.go | 33 ++++++++ config/config_loader.go | 7 +- config/metadata_report_config.go | 2 +- config/service.go | 6 +- metadata/definition/definition.go | 147 ++++++++++++++++++++++++++++++---- metadata/service/inmemory/service.go | 1 + metadata/service/remote/service.go | 16 ---- metadata/service/service.go | 4 +- registry/protocol/protocol.go | 40 ++++----- 13 files changed, 249 insertions(+), 88 deletions(-) diff --git a/common/constant/default.go b/common/constant/default.go index da8cd20..1596f9f 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -51,7 +51,7 @@ const ( const ( DEFAULT_KEY = "default" PREFIX_DEFAULT_KEY = "default." - DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,generic_service,execute,pshutdown" + DEFAULT_SERVICE_FILTERS = "token,accesslog,tps,generic_service,execute,pshutdown" DEFAULT_REFERENCE_FILTERS = "cshutdown" GENERIC_REFERENCE_FILTERS = "generic" GENERIC = "$invoke" diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index fd34810..f35e7ab 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -129,24 +129,25 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { var ( - err error - inv *invocation_impl.RPCInvocation - inIArr []interface{} - inVArr []reflect.Value - reply reflect.Value + err error + inv *invocation_impl.RPCInvocation + inIArr []interface{} + inVArr []reflect.Value + reply reflect.Value + replyEmptyFlag bool ) if methodName == "Echo" { methodName = "$echo" } - if len(outs) == 2 { + if len(outs) == 2 { // return (reply, error) if outs[0].Kind() == reflect.Ptr { reply = reflect.New(outs[0].Elem()) } else { reply = reflect.New(outs[0]) } - } else { - reply = valueOf + } else { // only return error + replyEmptyFlag = true } start := 0 @@ -160,10 +161,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { } start += 1 } - if len(outs) == 1 && in[end-1].Type().Kind() == reflect.Ptr { - end -= 1 - reply = in[len(in)-1] - } } if end-start <= 0 { @@ -184,8 +181,11 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { } inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName), - invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()), + invocation_impl.WithArguments(inIArr), invocation_impl.WithCallBack(p.callback), invocation_impl.WithParameterValues(inVArr)) + if !replyEmptyFlag { + inv.SetReply(reply.Interface()) + } for k, value := range p.attachments { inv.SetAttachments(k, value) @@ -215,8 +215,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { } else { logger.Warnf("result err: %v", err) } - } else { - logger.Debugf("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err) } if len(outs) == 1 { return []reflect.Value{reflect.ValueOf(&err).Elem()} @@ -251,7 +249,7 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { continue } - var funcOuts = make([]reflect.Type, outNum) + funcOuts := make([]reflect.Type, outNum) for i := 0; i < outNum; i++ { funcOuts[i] = t.Type.Out(i) } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index dd8ce02..6a070bc 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -137,10 +137,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati // prepare replyv var replyv reflect.Value - if method.ReplyType() == nil && len(method.ArgsType()) > 0 { - replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) - in = append(in, replyv) - } + //if method.ReplyType() == nil && len(method.ArgsType()) > 0 { + // replyv = reflect.New(method.ArgsType()[len(method.ArgsType())-1].Elem()) + // in = append(in, replyv) + //} returnValues := method.Method().Func.Call(in) diff --git a/common/rpc_service.go b/common/rpc_service.go index f739e31..dc10e2a 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -34,14 +34,43 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -// RPCService +// RPCService the type alias of interface{} +type RPCService = interface{} + +// ReferencedRPCService // rpc service interface -type RPCService interface { +type ReferencedRPCService interface { // Reference: // rpc service id or reference id Reference() string } +// GetReference return the reference id of the service. +// If the service implemented the ReferencedRPCService interface, +// it will call the Reference method. If not, it will +// return the struct name as the reference id. +func GetReference(service RPCService) string { + if s, ok := service.(ReferencedRPCService); ok { + return s.Reference() + } + + ref := "" + sType := reflect.TypeOf(service) + kind := sType.Kind() + switch kind { + case reflect.Struct: + ref = sType.Name() + case reflect.Ptr: + sName := sType.Elem().Name() + if sName != "" { + ref = sName + } else { + ref = sType.Elem().Field(0).Name + } + } + return ref +} + // AsyncCallbackService callback interface for async type AsyncCallbackService interface { // Callback: callback @@ -358,12 +387,6 @@ func suiteMethod(method reflect.Method) *MethodType { return nil } - if outNum != 1 && outNum != 2 { - logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2", - mname, mtype.String(), outNum) - return nil - } - // The latest return type of the method must be error. if returnType := mtype.Out(outNum - 1); returnType != typeOfError { if mname != METHOD_MAPPER { @@ -372,7 +395,7 @@ func suiteMethod(method reflect.Method) *MethodType { return nil } - // replyType + // todo, for multi reply condition, replyType is empty if outNum == 2 { replyType = mtype.Out(0) if !isExportedOrBuiltinType(replyType) { diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index e8bd393..5c04c9e 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -213,3 +213,36 @@ func TestSuiteMethod(t *testing.T) { methodType = suiteMethod(method) assert.Nil(t, methodType) } + +type ServiceWithoutRef struct{} + +func TestGetReference(t *testing.T) { + s0 := &TestService{} + ref0 := GetReference(s0) + assert.Equal(t, referenceTestPath, ref0) + + //s1 := TestService{} + //ref1 := GetReference(s1) + //assert.Equal(t, referenceTestPath, ref1) + + s2 := &struct { + TestService + }{} + ref2 := GetReference(s2) + assert.Equal(t, referenceTestPath, ref2) + + expectedReference := "ServiceWithoutRef" + s3 := &ServiceWithoutRef{} + ref3 := GetReference(s3) + assert.Equal(t, expectedReference, ref3) + + s4 := ServiceWithoutRef{} + ref4 := GetReference(s4) + assert.Equal(t, expectedReference, ref4) + + s5 := &struct { + ServiceWithoutRef + }{} + ref5 := GetReference(s5) + assert.Equal(t, expectedReference, ref5) +} diff --git a/config/config_loader.go b/config/config_loader.go index 14caf2c..6f7d6fa 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -132,7 +132,7 @@ func loadConsumerConfig() { } // start the metadata report if config set - if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil { + if err := StartMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil { logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err) return } @@ -220,7 +220,7 @@ func loadProviderConfig() { } // start the metadata report if config set - if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil { + if err := StartMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil { logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err) return } @@ -375,7 +375,8 @@ func GetRPCService(name string) common.RPCService { // RPCService create rpc service for consumer func RPCService(service common.RPCService) { - consumerConfig.References[service.Reference()].Implement(service) + ref := common.GetReference(service) + consumerConfig.References[ref].Implement(service) } // GetMetricConfig find the MetricConfig diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 6fb3fd2..602ba56 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -92,7 +92,7 @@ func (c *MetadataReportConfig) IsValid() bool { } // StartMetadataReport: The entry of metadata report start -func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error { +func StartMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error { if metadataReportConfig == nil || !metadataReportConfig.IsValid() { return nil } diff --git a/config/service.go b/config/service.go index 6deff3b..3eac7d3 100644 --- a/config/service.go +++ b/config/service.go @@ -28,12 +28,14 @@ var ( // SetConsumerService is called by init() of implement of RPCService func SetConsumerService(service common.RPCService) { - conServices[service.Reference()] = service + ref := common.GetReference(service) + conServices[ref] = service } // SetProviderService is called by init() of implement of RPCService func SetProviderService(service common.RPCService) { - proServices[service.Reference()] = service + ref := common.GetReference(service) + proServices[ref] = service } // GetConsumerService gets ConsumerService by @name diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index a032313..015bdc0 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -21,7 +21,13 @@ import ( "bytes" "encoding/json" "fmt" + "reflect" "strings" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" ) import ( @@ -36,10 +42,10 @@ type ServiceDefiner interface { // ServiceDefinition is the describer of service definition type ServiceDefinition struct { - CanonicalName string - CodeSource string - Methods []MethodDefinition - Types []TypeDefinition + CanonicalName string `json:"canonicalName"` + CodeSource string `json:"codeSource"` + Methods []MethodDefinition `json:"methods"` + Types []TypeDefinition `json:"types"` } // ToBytes convert ServiceDefinition to json string @@ -76,20 +82,20 @@ type FullServiceDefinition struct { // MethodDefinition is the describer of method definition type MethodDefinition struct { - Name string - ParameterTypes []string - ReturnType string - Parameters []TypeDefinition + Name string `json:"name"` + ParameterTypes []string `json:"parameterTypes"` + ReturnType string `json:"returnType"` + Parameters []TypeDefinition `json:"parameters"` } // TypeDefinition is the describer of type definition type TypeDefinition struct { - Id string - Type string - Items []TypeDefinition - Enums []string - Properties map[string]TypeDefinition - TypeBuilderName string + Id string `json:"id"` + Type string `json:"type"` + Items []TypeDefinition `json:"items"` + Enums []string `json:"enums"` + Properties map[string]TypeDefinition `json:"properties"` + TypeBuilderName string `json:"typeBuilderName"` } // BuildServiceDefinition can build service definition which will be used to describe a service @@ -99,15 +105,26 @@ func BuildServiceDefinition(service common.Service, url *common.URL) *ServiceDef for k, m := range service.Method() { var paramTypes []string + var param string if len(m.ArgsType()) > 0 { for _, t := range m.ArgsType() { - paramTypes = append(paramTypes, t.Kind().String()) + if t.Kind() == reflect.Ptr { + param = getArgType(reflect.New(t).Interface()) + } else { + param = t.Kind().String() + } + paramTypes = append(paramTypes, param) } } var returnType string + if m.ReplyType() != nil { - returnType = m.ReplyType().Kind().String() + if m.ReplyType().Kind() == reflect.Ptr { + returnType = getArgType(reflect.New(m.ReplyType()).Interface()) + } else { + returnType = m.ReplyType().Kind().String() + } } methodD := MethodDefinition{ @@ -135,3 +152,101 @@ func ServiceDescriperBuild(serviceName string, group string, version string) str } return buf.String() } + +func getArgType(v interface{}) string { + if v == nil { + return "V" + } + + v = reflect.ValueOf(v).Elem().Interface() + + switch v.(type) { + // Serialized tags for base types + case nil: + return "V" + case bool: + return "Z" + case []bool: + return "[Z" + case byte: + return "B" + case []byte: + return "[B" + case int8: + return "B" + case []int8: + return "[B" + case int16: + return "S" + case []int16: + return "[S" + case uint16: // Equivalent to Char of Java + return "C" + case []uint16: + return "[C" + // case rune: + // return "C" + case int: + return "J" + case []int: + return "[J" + case int32: + return "I" + case []int32: + return "[I" + case int64: + return "J" + case []int64: + return "[J" + case time.Time: + return "java.util.Date" + case []time.Time: + return "[Ljava.util.Date" + case float32: + return "F" + case []float32: + return "[F" + case float64: + return "D" + case []float64: + return "[D" + case string: + return "java.lang.String" + case []string: + return "[Ljava.lang.String;" + case []hessian.Object: + return "[Ljava.lang.Object;" + case map[interface{}]interface{}: + // return "java.util.HashMap" + return "java.util.Map" + case hessian.POJO: + return v.(hessian.POJO).JavaClassName() + // Serialized tags for complex types + default: + t := reflect.TypeOf(v) + if reflect.Ptr == t.Kind() { + t = reflect.TypeOf(reflect.ValueOf(v).Elem()) + } + switch t.Kind() { + case reflect.Struct: + v, ok := v.(hessian.POJO) + if ok { + return v.JavaClassName() + } + return "java.lang.Object" + case reflect.Slice, reflect.Array: + if t.Elem().Kind() == reflect.Struct { + return "[Ljava.lang.Object;" + } + // return "java.util.ArrayList" + return "java.util.List" + case reflect.Map: // Enter here, map may be map[string]int + return "java.util.Map" + default: + return "" + } + } + + // unreachable + // return "java.lang.RuntimeException" +} diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index 8da78c3..36eefe0 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package inmemory import ( diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index e2a7a64..302ae32 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -132,22 +132,6 @@ func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error { return nil } logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) - } else { - params := make(map[string]string, len(url.GetParams())) - url.RangeParams(func(key, value string) bool { - params[key] = value - return true - }) - id := &identifier.MetadataIdentifier{ - BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ - ServiceInterface: interfaceName, - Version: url.GetParam(constant.VERSION_KEY, ""), - Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), - Side: url.GetParam(constant.SIDE_KEY, "consumer"), - }, - } - mts.delegateReport.StoreConsumerMetadata(id, params) - return nil } return nil diff --git a/metadata/service/service.go b/metadata/service/service.go index 1d90f8a..f7a39bd 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -30,7 +30,7 @@ import ( // MetadataService is used to define meta data related behaviors // usually the implementation should be singleton type MetadataService interface { - common.RPCService + common.ReferencedRPCService // ServiceName will get the service's name in meta service , which is application name ServiceName() (string, error) // ExportURL will store the exported url in metadata @@ -86,7 +86,7 @@ func (mts *BaseMetadataService) ServiceName() (string, error) { return mts.serviceName, nil } -// Version will return the version of metadata service +// Reference will return the reference id of metadata service func (mts *BaseMetadataService) Reference() string { return constant.SIMPLE_METADATA_SERVICE_NAME } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index e50e2b9..0f37ac7 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -186,20 +186,22 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte serviceConfigurationListener.OverrideUrl(providerUrl) var reg registry.Registry - if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { - reg = getRegistry(registryUrl) - proto.registries.Store(registryUrl.Key(), reg) - logger.Infof("Export proto:%p registries address:%p", proto, proto.registries) - } else { - reg = regI.(registry.Registry) - } - registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl) - err := reg.Register(registeredProviderUrl) - if err != nil { - logger.Errorf("provider service %v register registry %v error, error message is %s", - providerUrl.Key(), registryUrl.Key(), err.Error()) - return nil + if registryUrl.Protocol != "" { + if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { + reg = getRegistry(registryUrl) + proto.registries.Store(registryUrl.Key(), reg) + logger.Infof("Export proto:%p registries address:%p", proto, proto.registries) + } else { + reg = regI.(registry.Registry) + } + registeredProviderUrl := getUrlToRegistry(providerUrl, registryUrl) + err := reg.Register(registeredProviderUrl) + if err != nil { + logger.Errorf("provider service %v register registry %v error, error message is %s", + providerUrl.Key(), registryUrl.Key(), err.Error()) + return nil + } } key := getCacheKey(invoker) @@ -214,11 +216,13 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte logger.Infof("The exporter has not been cached, and will return a new exporter!") } - go func() { - if err = reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil { - logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err) - } - }() + if registryUrl.Protocol != "" { + go func() { + if err := reg.Subscribe(overriderUrl, overrideSubscribeListener); err != nil { + logger.Warnf("reg.subscribe(overriderUrl:%v) = error:%v", overriderUrl, err) + } + }() + } return cachedExporter.(protocol.Exporter) }
