This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch feature-triple
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/feature-triple by this push:
new af424f909 implement triple protocol server-side adaptation (#2378)
af424f909 is described below
commit af424f909acc20f89317cb4a117c6a3b6b2ac76b
Author: Scout Wang <[email protected]>
AuthorDate: Mon Aug 14 11:18:39 2023 +0800
implement triple protocol server-side adaptation (#2378)
---
client/reference_config.go | 9 +-
common/config/utils.go | 6 -
config/consumer_config.go | 1 +
.../internal/proto/greettriple/greet.triple.go | 10 +-
protocol/triple/internal/server/cmd/main.go | 6 +-
protocol/triple/server.go | 156 ++++++++++++---------
protocol/triple/triple.go | 20 ++-
protocol/triple/triple_protocol/handler.go | 6 +-
registry/registry_config.go | 9 +-
{provider => server}/options.go | 2 +-
provider/provider.go => server/server.go | 10 +-
11 files changed, 126 insertions(+), 109 deletions(-)
diff --git a/client/reference_config.go b/client/reference_config.go
index f4257063a..ca75dd725 100644
--- a/client/reference_config.go
+++ b/client/reference_config.go
@@ -40,7 +40,6 @@ import (
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/proxy"
@@ -195,9 +194,6 @@ func (rc *ReferenceConfig) refer(info *ClientInfo, srv
interface{}) {
common.WithParamsValue(constant.MetadataTypeKey,
rc.metaDataType),
)
- if info == nil {
- config.SetConsumerServiceByInterfaceName(rc.InterfaceName, srv)
- }
if rc.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
@@ -310,7 +306,10 @@ func (rc *ReferenceConfig) refer(info *ClientInfo, srv
interface{}) {
// create proxy
if info == nil {
if rc.Async {
- callback := config.GetCallback(rc.id)
+ var callback common.CallbackResponse
+ if asyncSrv, ok := srv.(common.AsyncCallbackService);
ok {
+ callback = asyncSrv.CallBack
+ }
rc.pxy =
extension.GetProxyFactory(rc.proxyFactory).GetAsyncProxy(rc.invoker, callback,
cfgURL)
} else {
rc.pxy =
extension.GetProxyFactory(rc.proxyFactory).GetProxy(rc.invoker, cfgURL)
diff --git a/common/config/utils.go b/common/config/utils.go
index ffe900013..76872fb2e 100644
--- a/common/config/utils.go
+++ b/common/config/utils.go
@@ -14,7 +14,6 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
)
var validate *validator.Validate
@@ -95,11 +94,6 @@ func removeMinus(strArr []string) string {
return normalStr
}
-// ClientNameID unique identifier id for client
-func ClientNameID(config extension.Config, protocol, address string) string {
- return strings.Join([]string{config.Prefix(), protocol, address}, "-")
-}
-
func IsValid(addr string) bool {
return addr != "" && addr != constant.NotAvailable
}
diff --git a/config/consumer_config.go b/config/consumer_config.go
index 89170a3be..fbcf5f3a6 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -162,6 +162,7 @@ func (cc *ConsumerConfig) Load() {
}
refConfig.Refer(refRPCService)
refConfig.Implement(refRPCService)
+ SetConsumerServiceByInterfaceName(refConfig.InterfaceName,
refRPCService)
}
for info, refRPCService := range GetClientInfoServicesMap() {
diff --git a/protocol/triple/internal/proto/greettriple/greet.triple.go
b/protocol/triple/internal/proto/greettriple/greet.triple.go
index 841223a49..eca9e9a1a 100644
--- a/protocol/triple/internal/proto/greettriple/greet.triple.go
+++ b/protocol/triple/internal/proto/greettriple/greet.triple.go
@@ -17,7 +17,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
proto "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
triple_protocol
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
- "dubbo.apache.org/dubbo-go/v3/provider"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
// This is a compile-time assertion to ensure that this generated file and the
connect package are
@@ -226,8 +226,8 @@ type GreetServiceHandler interface {
GreetServerStream(context.Context, *proto.GreetServerStreamRequest,
GreetService_GreetServerStreamServer) error
}
-func ProvideGreetServiceHandler(pro *provider.Provider, hdlr
GreetServiceHandler) error {
- return pro.Provide(hdlr, &GreetService_ServiceInfo)
+func RegisterGreetServiceHandler(srv *server.Server, hdlr GreetServiceHandler)
error {
+ return srv.Register(hdlr, &GreetService_ServiceInfo)
}
type GreetService_GreetStreamServer interface {
@@ -299,10 +299,10 @@ func (g greetServiceGreetServerStreamServer) Send(msg
*proto.GreetServerStreamRe
return g.ServerStream.Send(msg)
}
-var GreetService_ServiceInfo = provider.ServiceInfo{
+var GreetService_ServiceInfo = server.ServiceInfo{
InterfaceName: "greet.GreetService",
ServiceType: (*GreetServiceHandler)(nil),
- Methods: []provider.MethodInfo{
+ Methods: []server.MethodInfo{
{
Name: "Greet",
Type: constant.CallUnary,
diff --git a/protocol/triple/internal/server/cmd/main.go
b/protocol/triple/internal/server/cmd/main.go
index b38a89636..f95118f39 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd/main.go
@@ -16,7 +16,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/imports"
greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/greettriple"
- "dubbo.apache.org/dubbo-go/v3/provider"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
type GreetConnectServer struct {
@@ -64,11 +64,11 @@ func (srv *GreetConnectServer) GreetServerStream(ctx
context.Context, req *greet
}
func main() {
- pro, err := provider.NewProvider()
+ srv, err := server.NewServer()
if err != nil {
panic(err)
}
- if err := greettriple.ProvideGreetServiceHandler(pro,
&GreetConnectServer{}); err != nil {
+ if err := greettriple.RegisterGreetServiceHandler(srv,
&GreetConnectServer{}); err != nil {
panic(err)
}
select {}
diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index 3ac421ce8..9b229610d 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -20,7 +20,6 @@ package triple
import (
"context"
"crypto/tls"
- "fmt"
"net/http"
"path"
"sync"
@@ -42,20 +41,10 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
- proto "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
tri "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
-//// TripleService is implemented by user logic struct wrapping ProviderBase
generated by protoc-gen-triple.
-//type TripleService interface {
-// // SetProxyImpl sets proxy. All invocation logics are embedded in impl.
-// SetProxyImpl(impl protocol.Invoker)
-// // GetProxyImpl gets proxy.
-// GetProxyImpl() protocol.Invoker
-// // BuildHandler receives user logic struct and handlerOption defined by
triple to create handler.
-// // For now, impl is just used to finish type checking.
-// BuildHandler(impl interface{}, opts ...tri.HandlerOption) (string,
http.Handler)
-//}
// Server is TRIPLE server
type Server struct {
httpServer *http.Server
@@ -67,12 +56,14 @@ func NewServer() *Server {
}
// Start TRIPLE server
-func (s *Server) Start(url *common.URL) {
+func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
var (
addr string
err error
+ url *common.URL
hanOpts []tri.HandlerOption
)
+ url = invoker.GetURL()
addr = url.Location
srv := &http.Server{
Addr: addr,
@@ -90,6 +81,7 @@ func (s *Server) Start(url *common.URL) {
}
hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))
+ // todo: implement interceptor
// If global trace instance was set, then server tracer instance
// can be get. If not, will return NoopTracer.
//tracer := opentracing.GlobalTracer()
@@ -111,7 +103,7 @@ func (s *Server) Start(url *common.URL) {
if err != nil {
return
}
- logger.Infof("Grpc Server initialized the TLSConfig
configuration")
+ logger.Infof("Triple Server initialized the TLSConfig
configuration")
}
srv.TLSConfig = cfg
@@ -121,16 +113,18 @@ func (s *Server) Start(url *common.URL) {
s.httpServer = srv
go func() {
- providerServices := config.GetProviderConfig().Services
-
- if len(providerServices) == 0 {
- panic("provider service map is null")
- }
+ //providerServices := config.GetProviderConfig().Services
+ //
+ //if len(providerServices) == 0 {
+ // panic("provider service map is null")
+ //}
// todo: remove this logic?
// wait all exporter ready , then set proxy impl and grpc
registerService
- waitTripleExporter(providerServices)
+ //waitTripleExporter(providerServices)
mux := http.NewServeMux()
- handleService(providerServices, mux)
+ if info != nil {
+ handleServiceWithInfo(invoker, info, mux)
+ }
// todo: figure it out this process
//reflection.Register(server)
// todo: without tls
@@ -173,68 +167,94 @@ func waitTripleExporter(providerServices
map[string]*config.ServiceConfig) {
return
}
case <-ta.C:
- panic("wait GRPC_NEW exporter timeout when start
GRPC_NEW server")
+ panic("wait Triple exporter timeout when start GRPC_NEW
server")
}
}
}
-// handleService injects invoker and creates handler based on ServiceConfig
and provider service.
-func handleService(providerServices map[string]*config.ServiceConfig, mux
*http.ServeMux, opts ...tri.HandlerOption) {
- for key, providerService := range providerServices {
- service := config.GetProviderService(key)
- ds, ok := service.(TripleService)
- if !ok {
- panic("illegal service type registered")
- }
-
- serviceKey := common.ServiceKey(providerService.Interface,
providerService.Group, providerService.Version)
- exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
- if exporter == nil {
- panic(fmt.Sprintf("no exporter found for servicekey:
%v", serviceKey))
- }
- invoker := exporter.(protocol.Exporter).GetInvoker()
- if invoker == nil {
- panic(fmt.Sprintf("no invoker found for servicekey:
%v", serviceKey))
- }
-
- // inject invoker, it has all invocation logics
- ds.SetProxyImpl(invoker)
- path, handler := ds.BuildHandler(service, opts...)
- mux.Handle(path, tri.New)
- mux.Handle(path, handler)
- }
-}
+//// handleService injects invoker and creates handler based on ServiceConfig
and provider service.
+//func handleService(providerServices map[string]*config.ServiceConfig, mux
*http.ServeMux, opts ...tri.HandlerOption) {
+// for key, providerService := range providerServices {
+// service := config.GetProviderService(key)
+// ds, ok := service.(TripleService)
+// if !ok {
+// panic("illegal service type registered")
+// }
+//
+// serviceKey := common.ServiceKey(providerService.Interface,
providerService.Group, providerService.Version)
+// exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
+// if exporter == nil {
+// panic(fmt.Sprintf("no exporter found for servicekey:
%v", serviceKey))
+// }
+// invoker := exporter.(protocol.Exporter).GetInvoker()
+// if invoker == nil {
+// panic(fmt.Sprintf("no invoker found for servicekey:
%v", serviceKey))
+// }
+//
+// // inject invoker, it has all invocation logics
+// ds.SetProxyImpl(invoker)
+// path, handler := ds.BuildHandler(service, opts...)
+// mux.Handle(path, tri.New)
+// mux.Handle(path, handler)
+// }
+//}
-func handleService2(url *common.URL, mux *http.ServeMux, opts
...tri.HandlerOption) {
- interfaceName := ""
- methodNames := []string{}
- types := []string{}
- serviceKey := url.ServiceKey()
- exporter, ok := tripleProtocol.ExporterMap().Load(serviceKey)
- if !ok {
- panic(fmt.Sprintf("no exporter found for servicekey: %v",
serviceKey))
- }
- for i, method := range methodNames {
+// handleServiceWithInfo injects invoker and create handler based on
ServiceInfo
+func handleServiceWithInfo(invoker protocol.Invoker, info *server.ServiceInfo,
mux *http.ServeMux, opts ...tri.HandlerOption) {
+ for _, method := range info.Methods {
var handler http.Handler
- procedure := path.Join(interfaceName, method)
- typ := types[i]
- switch typ {
+ procedure := path.Join(info.InterfaceName, method.Name)
+ switch method.Type {
case constant.CallUnary:
handler = tri.NewUnaryHandler(
procedure,
+ method.ReqInitFunc,
func(ctx context.Context, req *tri.Request)
(*tri.Response, error) {
var args []interface{}
- args = append(args, req.)
- // provider.Serve()
- invo :=
invocation.NewRPCInvocation("Greet", args, nil)
- res := s.proxyImpl.Invoke(ctx, invo)
- return
res.Result().(*triple_protocol.Response[proto.GreetResponse]), res.Error()
+ args = append(args, req.Msg)
+ // todo: inject method.Meta to
attachments
+ invo :=
invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Result().(*tri.Response),
res.Error()
},
opts...,
)
-
+ case constant.CallClientStream:
+ handler = tri.NewClientStreamHandler(
+ procedure,
+ func(ctx context.Context, stream
*tri.ClientStream) (*tri.Response, error) {
+ var args []interface{}
+ args = append(args,
method.StreamInitFunc(stream))
+ invo :=
invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Result().(*tri.Response),
res.Error()
+ },
+ )
+ case constant.CallServerStream:
+ handler = tri.NewServerStreamHandler(
+ procedure,
+ method.ReqInitFunc,
+ func(ctx context.Context, request *tri.Request,
stream *tri.ServerStream) error {
+ var args []interface{}
+ args = append(args, request.Msg,
method.StreamInitFunc(stream))
+ invo :=
invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Error()
+ },
+ )
+ case constant.CallBidiStream:
+ handler = tri.NewBidiStreamHandler(
+ procedure,
+ func(ctx context.Context, stream
*tri.BidiStream) error {
+ var args []interface{}
+ args = append(args,
method.StreamInitFunc(stream))
+ invo :=
invocation.NewRPCInvocation(method.Name, args, nil)
+ res := invoker.Invoke(ctx, invo)
+ return res.Error()
+ },
+ )
}
-
+ mux.Handle(procedure, handler)
}
}
diff --git a/protocol/triple/triple.go b/protocol/triple/triple.go
index b429b915f..8f7c1a7e0 100644
--- a/protocol/triple/triple.go
+++ b/protocol/triple/triple.go
@@ -29,6 +29,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
const (
@@ -54,14 +55,17 @@ type TripleProtocol struct {
func (tp *TripleProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
url := invoker.GetURL()
serviceKey := url.ServiceKey()
+ // todo: retrieve this info from url
+ info := &server.ServiceInfo{}
exporter := NewTripleExporter(serviceKey, invoker, tp.ExporterMap())
tp.SetExporterMap(serviceKey, exporter)
logger.Infof("[TRIPLE Protocol] Export service: %s", url.String())
- tp.openServer(url)
+ tp.openServer(invoker, info)
return exporter
}
-func (tp *TripleProtocol) openServer(url *common.URL) {
+func (tp *TripleProtocol) openServer(invoker protocol.Invoker, info
*server.ServiceInfo) {
+ url := invoker.GetURL()
tp.serverLock.Lock()
defer tp.serverLock.Unlock()
@@ -69,23 +73,17 @@ func (tp *TripleProtocol) openServer(url *common.URL) {
return
}
- // todo: remove this logic?
if _, ok := tp.ExporterMap().Load(url.ServiceKey()); !ok {
- panic("[GRPC_NEW Protocol]" + url.Key() + "is not existing")
+ panic("[TRIPLE Protocol]" + url.Key() + "is not existing")
}
srv := NewServer()
tp.serverMap[url.Location] = srv
- srv.Start(url)
+ srv.Start(invoker, info)
}
// Refer a remote triple service
func (tp *TripleProtocol) Refer(url *common.URL) protocol.Invoker {
- //client, err := NewClient(url)
- //if err != nil {
- // logger.Warnf("can't dial the server: %s", url.Key())
- // return nil
- //}
invoker, err := NewTripleInvoker(url)
if err != nil {
logger.Warnf("can't dial the server: %s", url.Key())
@@ -97,7 +95,7 @@ func (tp *TripleProtocol) Refer(url *common.URL)
protocol.Invoker {
}
func (tp *TripleProtocol) Destroy() {
- logger.Infof("GrpcProtocol destroy.")
+ logger.Infof("TripleProtocol destroy.")
tp.serverLock.Lock()
defer tp.serverLock.Unlock()
diff --git a/protocol/triple/triple_protocol/handler.go
b/protocol/triple/triple_protocol/handler.go
index 25f49504d..2593389e2 100644
--- a/protocol/triple/triple_protocol/handler.go
+++ b/protocol/triple/triple_protocol/handler.go
@@ -137,14 +137,14 @@ func NewServerStreamHandler(
procedure,
StreamTypeServer,
func(ctx context.Context, conn StreamingHandlerConn) error {
- msg := reqInitFunc()
- if err := conn.Receive(&msg); err != nil {
+ req := reqInitFunc()
+ if err := conn.Receive(&req); err != nil {
return err
}
return implementation(
ctx,
&Request{
- Msg: &msg,
+ Msg: req,
spec: conn.Spec(),
peer: conn.Peer(),
header: conn.RequestHeader(),
diff --git a/registry/registry_config.go b/registry/registry_config.go
index 774b8167f..7ecac6325 100644
--- a/registry/registry_config.go
+++ b/registry/registry_config.go
@@ -83,7 +83,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType)
url.Values {
urlMap.Set(constant.RegistryKey+"."+constant.RegistryZoneKey, c.Zone)
urlMap.Set(constant.RegistryKey+"."+constant.WeightKey,
strconv.FormatInt(c.Weight, 10))
urlMap.Set(constant.RegistryTTLKey, c.TTL)
- urlMap.Set(constant.ClientNameKey, commonCfg.ClientNameID(c,
c.Protocol, c.Address))
+ urlMap.Set(constant.ClientNameKey, ClientNameID(c, c.Protocol,
c.Address))
for k, v := range c.Params {
urlMap.Set(k, v)
@@ -111,7 +111,7 @@ func (c *RegistryConfig) toMetadataReportUrl()
(*common.URL, error) {
common.WithUsername(c.Username),
common.WithPassword(c.Password),
common.WithParamsValue(constant.TimeoutKey, c.Timeout),
- common.WithParamsValue(constant.ClientNameKey,
commonCfg.ClientNameID(c, c.Protocol, c.Address)),
+ common.WithParamsValue(constant.ClientNameKey, ClientNameID(c,
c.Protocol, c.Address)),
common.WithParamsValue(constant.MetadataReportGroupKey,
c.Group),
common.WithParamsValue(constant.MetadataReportNamespaceKey,
c.Namespace),
)
@@ -245,6 +245,11 @@ func LoadRegistries(registryIds []string, registries
map[string]*RegistryConfig,
return registryURLs
}
+// ClientNameID unique identifier id for client
+func ClientNameID(config *RegistryConfig, protocol, address string) string {
+ return strings.Join([]string{config.Prefix(), protocol, address}, "-")
+}
+
func (c *RegistryConfig) createNewURL(protocol string, address string,
roleType common.RoleType) (*common.URL, error) {
return common.NewURL(protocol+"://"+address,
common.WithParams(c.getUrlMap(roleType)),
diff --git a/provider/options.go b/server/options.go
similarity index 75%
rename from provider/options.go
rename to server/options.go
index f762c8094..01c35d57f 100644
--- a/provider/options.go
+++ b/server/options.go
@@ -1,4 +1,4 @@
-package provider
+package server
type Options struct {
}
diff --git a/provider/provider.go b/server/server.go
similarity index 69%
rename from provider/provider.go
rename to server/server.go
index 72524179e..b860be890 100644
--- a/provider/provider.go
+++ b/server/server.go
@@ -1,14 +1,14 @@
-package provider
+package server
import (
"context"
)
-type Provider struct {
+type Server struct {
}
-// Provide assemble invoker chains like ProviderConfig.Load, init a service
per call
-func (pro *Provider) Provide(handler interface{}, info *ServiceInfo, opts
...Option) error {
+// Register assemble invoker chains like ProviderConfig.Load, init a service
per call
+func (pro *Server) Register(handler interface{}, info *ServiceInfo, opts
...Option) error {
// put information from info to url
// ProviderConfig.Load
@@ -33,6 +33,6 @@ type MethodInfo struct {
Meta map[string]interface{}
}
-func NewProvider() (*Provider, error) {
+func NewServer() (*Server, error) {
return nil, nil
}