This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch feature-triple
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/feature-triple by this push:
     new af424f909 implement triple protocol server-side adaptation (#2378)
af424f909 is described below

commit af424f909acc20f89317cb4a117c6a3b6b2ac76b
Author: Scout Wang <[email protected]>
AuthorDate: Mon Aug 14 11:18:39 2023 +0800

    implement triple protocol server-side adaptation (#2378)
---
 client/reference_config.go                         |   9 +-
 common/config/utils.go                             |   6 -
 config/consumer_config.go                          |   1 +
 .../internal/proto/greettriple/greet.triple.go     |  10 +-
 protocol/triple/internal/server/cmd/main.go        |   6 +-
 protocol/triple/server.go                          | 156 ++++++++++++---------
 protocol/triple/triple.go                          |  20 ++-
 protocol/triple/triple_protocol/handler.go         |   6 +-
 registry/registry_config.go                        |   9 +-
 {provider => server}/options.go                    |   2 +-
 provider/provider.go => server/server.go           |  10 +-
 11 files changed, 126 insertions(+), 109 deletions(-)

diff --git a/client/reference_config.go b/client/reference_config.go
index f4257063a..ca75dd725 100644
--- a/client/reference_config.go
+++ b/client/reference_config.go
@@ -40,7 +40,6 @@ import (
        commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/config"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        "dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
        "dubbo.apache.org/dubbo-go/v3/proxy"
@@ -195,9 +194,6 @@ func (rc *ReferenceConfig) refer(info *ClientInfo, srv 
interface{}) {
                common.WithParamsValue(constant.MetadataTypeKey, 
rc.metaDataType),
        )
 
-       if info == nil {
-               config.SetConsumerServiceByInterfaceName(rc.InterfaceName, srv)
-       }
        if rc.ForceTag {
                cfgURL.AddParam(constant.ForceUseTag, "true")
        }
@@ -310,7 +306,10 @@ func (rc *ReferenceConfig) refer(info *ClientInfo, srv 
interface{}) {
        // create proxy
        if info == nil {
                if rc.Async {
-                       callback := config.GetCallback(rc.id)
+                       var callback common.CallbackResponse
+                       if asyncSrv, ok := srv.(common.AsyncCallbackService); 
ok {
+                               callback = asyncSrv.CallBack
+                       }
                        rc.pxy = 
extension.GetProxyFactory(rc.proxyFactory).GetAsyncProxy(rc.invoker, callback, 
cfgURL)
                } else {
                        rc.pxy = 
extension.GetProxyFactory(rc.proxyFactory).GetProxy(rc.invoker, cfgURL)
diff --git a/common/config/utils.go b/common/config/utils.go
index ffe900013..76872fb2e 100644
--- a/common/config/utils.go
+++ b/common/config/utils.go
@@ -14,7 +14,6 @@ import (
 
 import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
 )
 
 var validate *validator.Validate
@@ -95,11 +94,6 @@ func removeMinus(strArr []string) string {
        return normalStr
 }
 
-// ClientNameID unique identifier id for client
-func ClientNameID(config extension.Config, protocol, address string) string {
-       return strings.Join([]string{config.Prefix(), protocol, address}, "-")
-}
-
 func IsValid(addr string) bool {
        return addr != "" && addr != constant.NotAvailable
 }
diff --git a/config/consumer_config.go b/config/consumer_config.go
index 89170a3be..fbcf5f3a6 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -162,6 +162,7 @@ func (cc *ConsumerConfig) Load() {
                }
                refConfig.Refer(refRPCService)
                refConfig.Implement(refRPCService)
+               SetConsumerServiceByInterfaceName(refConfig.InterfaceName, 
refRPCService)
        }
 
        for info, refRPCService := range GetClientInfoServicesMap() {
diff --git a/protocol/triple/internal/proto/greettriple/greet.triple.go 
b/protocol/triple/internal/proto/greettriple/greet.triple.go
index 841223a49..eca9e9a1a 100644
--- a/protocol/triple/internal/proto/greettriple/greet.triple.go
+++ b/protocol/triple/internal/proto/greettriple/greet.triple.go
@@ -17,7 +17,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/config"
        proto "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
        triple_protocol 
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
-       "dubbo.apache.org/dubbo-go/v3/provider"
+       "dubbo.apache.org/dubbo-go/v3/server"
 )
 
 // This is a compile-time assertion to ensure that this generated file and the 
connect package are
@@ -226,8 +226,8 @@ type GreetServiceHandler interface {
        GreetServerStream(context.Context, *proto.GreetServerStreamRequest, 
GreetService_GreetServerStreamServer) error
 }
 
-func ProvideGreetServiceHandler(pro *provider.Provider, hdlr 
GreetServiceHandler) error {
-       return pro.Provide(hdlr, &GreetService_ServiceInfo)
+func RegisterGreetServiceHandler(srv *server.Server, hdlr GreetServiceHandler) 
error {
+       return srv.Register(hdlr, &GreetService_ServiceInfo)
 }
 
 type GreetService_GreetStreamServer interface {
@@ -299,10 +299,10 @@ func (g greetServiceGreetServerStreamServer) Send(msg 
*proto.GreetServerStreamRe
        return g.ServerStream.Send(msg)
 }
 
-var GreetService_ServiceInfo = provider.ServiceInfo{
+var GreetService_ServiceInfo = server.ServiceInfo{
        InterfaceName: "greet.GreetService",
        ServiceType:   (*GreetServiceHandler)(nil),
-       Methods: []provider.MethodInfo{
+       Methods: []server.MethodInfo{
                {
                        Name: "Greet",
                        Type: constant.CallUnary,
diff --git a/protocol/triple/internal/server/cmd/main.go 
b/protocol/triple/internal/server/cmd/main.go
index b38a89636..f95118f39 100644
--- a/protocol/triple/internal/server/cmd/main.go
+++ b/protocol/triple/internal/server/cmd/main.go
@@ -16,7 +16,7 @@ import (
        _ "dubbo.apache.org/dubbo-go/v3/imports"
        greet "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
        
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/greettriple"
-       "dubbo.apache.org/dubbo-go/v3/provider"
+       "dubbo.apache.org/dubbo-go/v3/server"
 )
 
 type GreetConnectServer struct {
@@ -64,11 +64,11 @@ func (srv *GreetConnectServer) GreetServerStream(ctx 
context.Context, req *greet
 }
 
 func main() {
-       pro, err := provider.NewProvider()
+       srv, err := server.NewServer()
        if err != nil {
                panic(err)
        }
-       if err := greettriple.ProvideGreetServiceHandler(pro, 
&GreetConnectServer{}); err != nil {
+       if err := greettriple.RegisterGreetServiceHandler(srv, 
&GreetConnectServer{}); err != nil {
                panic(err)
        }
        select {}
diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index 3ac421ce8..9b229610d 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -20,7 +20,6 @@ package triple
 import (
        "context"
        "crypto/tls"
-       "fmt"
        "net/http"
        "path"
        "sync"
@@ -42,20 +41,10 @@ import (
        "dubbo.apache.org/dubbo-go/v3/config"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
-       proto "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto"
        tri "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
+       "dubbo.apache.org/dubbo-go/v3/server"
 )
 
-//// TripleService is implemented by user logic struct wrapping ProviderBase 
generated by protoc-gen-triple.
-//type TripleService interface {
-//     // SetProxyImpl sets proxy. All invocation logics are embedded in impl.
-//     SetProxyImpl(impl protocol.Invoker)
-//     // GetProxyImpl gets proxy.
-//     GetProxyImpl() protocol.Invoker
-//     // BuildHandler receives user logic struct and handlerOption defined by 
triple to create handler.
-//     // For now, impl is just used to finish type checking.
-//     BuildHandler(impl interface{}, opts ...tri.HandlerOption) (string, 
http.Handler)
-//}
 // Server is TRIPLE server
 type Server struct {
        httpServer *http.Server
@@ -67,12 +56,14 @@ func NewServer() *Server {
 }
 
 // Start TRIPLE server
-func (s *Server) Start(url *common.URL) {
+func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
        var (
                addr    string
                err     error
+               url     *common.URL
                hanOpts []tri.HandlerOption
        )
+       url = invoker.GetURL()
        addr = url.Location
        srv := &http.Server{
                Addr: addr,
@@ -90,6 +81,7 @@ func (s *Server) Start(url *common.URL) {
        }
        hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))
 
+       // todo: implement interceptor
        // If global trace instance was set, then server tracer instance
        // can be get. If not, will return NoopTracer.
        //tracer := opentracing.GlobalTracer()
@@ -111,7 +103,7 @@ func (s *Server) Start(url *common.URL) {
                if err != nil {
                        return
                }
-               logger.Infof("Grpc Server initialized the TLSConfig 
configuration")
+               logger.Infof("Triple Server initialized the TLSConfig 
configuration")
        }
        srv.TLSConfig = cfg
 
@@ -121,16 +113,18 @@ func (s *Server) Start(url *common.URL) {
        s.httpServer = srv
 
        go func() {
-               providerServices := config.GetProviderConfig().Services
-
-               if len(providerServices) == 0 {
-                       panic("provider service map is null")
-               }
+               //providerServices := config.GetProviderConfig().Services
+               //
+               //if len(providerServices) == 0 {
+               //      panic("provider service map is null")
+               //}
                // todo: remove this logic?
                // wait all exporter ready , then set proxy impl and grpc 
registerService
-               waitTripleExporter(providerServices)
+               //waitTripleExporter(providerServices)
                mux := http.NewServeMux()
-               handleService(providerServices, mux)
+               if info != nil {
+                       handleServiceWithInfo(invoker, info, mux)
+               }
                // todo: figure it out this process
                //reflection.Register(server)
                // todo: without tls
@@ -173,68 +167,94 @@ func waitTripleExporter(providerServices 
map[string]*config.ServiceConfig) {
                                return
                        }
                case <-ta.C:
-                       panic("wait GRPC_NEW exporter timeout when start 
GRPC_NEW server")
+                       panic("wait Triple exporter timeout when start GRPC_NEW 
server")
                }
        }
 }
 
-// handleService injects invoker and creates handler based on ServiceConfig 
and provider service.
-func handleService(providerServices map[string]*config.ServiceConfig, mux 
*http.ServeMux, opts ...tri.HandlerOption) {
-       for key, providerService := range providerServices {
-               service := config.GetProviderService(key)
-               ds, ok := service.(TripleService)
-               if !ok {
-                       panic("illegal service type registered")
-               }
-
-               serviceKey := common.ServiceKey(providerService.Interface, 
providerService.Group, providerService.Version)
-               exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
-               if exporter == nil {
-                       panic(fmt.Sprintf("no exporter found for servicekey: 
%v", serviceKey))
-               }
-               invoker := exporter.(protocol.Exporter).GetInvoker()
-               if invoker == nil {
-                       panic(fmt.Sprintf("no invoker found for servicekey: 
%v", serviceKey))
-               }
-
-               // inject invoker, it has all invocation logics
-               ds.SetProxyImpl(invoker)
-               path, handler := ds.BuildHandler(service, opts...)
-               mux.Handle(path, tri.New)
-               mux.Handle(path, handler)
-       }
-}
+//// handleService injects invoker and creates handler based on ServiceConfig 
and provider service.
+//func handleService(providerServices map[string]*config.ServiceConfig, mux 
*http.ServeMux, opts ...tri.HandlerOption) {
+//     for key, providerService := range providerServices {
+//             service := config.GetProviderService(key)
+//             ds, ok := service.(TripleService)
+//             if !ok {
+//                     panic("illegal service type registered")
+//             }
+//
+//             serviceKey := common.ServiceKey(providerService.Interface, 
providerService.Group, providerService.Version)
+//             exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
+//             if exporter == nil {
+//                     panic(fmt.Sprintf("no exporter found for servicekey: 
%v", serviceKey))
+//             }
+//             invoker := exporter.(protocol.Exporter).GetInvoker()
+//             if invoker == nil {
+//                     panic(fmt.Sprintf("no invoker found for servicekey: 
%v", serviceKey))
+//             }
+//
+//             // inject invoker, it has all invocation logics
+//             ds.SetProxyImpl(invoker)
+//             path, handler := ds.BuildHandler(service, opts...)
+//             mux.Handle(path, tri.New)
+//             mux.Handle(path, handler)
+//     }
+//}
 
-func handleService2(url *common.URL, mux *http.ServeMux, opts 
...tri.HandlerOption) {
-       interfaceName := ""
-       methodNames := []string{}
-       types := []string{}
-       serviceKey := url.ServiceKey()
-       exporter, ok := tripleProtocol.ExporterMap().Load(serviceKey)
-       if !ok {
-               panic(fmt.Sprintf("no exporter found for servicekey: %v", 
serviceKey))
-       }
-       for i, method := range methodNames {
+// handleServiceWithInfo injects invoker and create handler based on 
ServiceInfo
+func handleServiceWithInfo(invoker protocol.Invoker, info *server.ServiceInfo, 
mux *http.ServeMux, opts ...tri.HandlerOption) {
+       for _, method := range info.Methods {
                var handler http.Handler
-               procedure := path.Join(interfaceName, method)
-               typ := types[i]
-               switch typ {
+               procedure := path.Join(info.InterfaceName, method.Name)
+               switch method.Type {
                case constant.CallUnary:
                        handler = tri.NewUnaryHandler(
                                procedure,
+                               method.ReqInitFunc,
                                func(ctx context.Context, req *tri.Request) 
(*tri.Response, error) {
                                        var args []interface{}
-                                       args = append(args, req.)
-                                       // provider.Serve()
-                                       invo := 
invocation.NewRPCInvocation("Greet", args, nil)
-                                       res := s.proxyImpl.Invoke(ctx, invo)
-                                       return 
res.Result().(*triple_protocol.Response[proto.GreetResponse]), res.Error()
+                                       args = append(args, req.Msg)
+                                       // todo: inject method.Meta to 
attachments
+                                       invo := 
invocation.NewRPCInvocation(method.Name, args, nil)
+                                       res := invoker.Invoke(ctx, invo)
+                                       return res.Result().(*tri.Response), 
res.Error()
                                },
                                opts...,
                        )
-
+               case constant.CallClientStream:
+                       handler = tri.NewClientStreamHandler(
+                               procedure,
+                               func(ctx context.Context, stream 
*tri.ClientStream) (*tri.Response, error) {
+                                       var args []interface{}
+                                       args = append(args, 
method.StreamInitFunc(stream))
+                                       invo := 
invocation.NewRPCInvocation(method.Name, args, nil)
+                                       res := invoker.Invoke(ctx, invo)
+                                       return res.Result().(*tri.Response), 
res.Error()
+                               },
+                       )
+               case constant.CallServerStream:
+                       handler = tri.NewServerStreamHandler(
+                               procedure,
+                               method.ReqInitFunc,
+                               func(ctx context.Context, request *tri.Request, 
stream *tri.ServerStream) error {
+                                       var args []interface{}
+                                       args = append(args, request.Msg, 
method.StreamInitFunc(stream))
+                                       invo := 
invocation.NewRPCInvocation(method.Name, args, nil)
+                                       res := invoker.Invoke(ctx, invo)
+                                       return res.Error()
+                               },
+                       )
+               case constant.CallBidiStream:
+                       handler = tri.NewBidiStreamHandler(
+                               procedure,
+                               func(ctx context.Context, stream 
*tri.BidiStream) error {
+                                       var args []interface{}
+                                       args = append(args, 
method.StreamInitFunc(stream))
+                                       invo := 
invocation.NewRPCInvocation(method.Name, args, nil)
+                                       res := invoker.Invoke(ctx, invo)
+                                       return res.Error()
+                               },
+                       )
                }
-
+               mux.Handle(procedure, handler)
        }
 }
 
diff --git a/protocol/triple/triple.go b/protocol/triple/triple.go
index b429b915f..8f7c1a7e0 100644
--- a/protocol/triple/triple.go
+++ b/protocol/triple/triple.go
@@ -29,6 +29,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/protocol"
+       "dubbo.apache.org/dubbo-go/v3/server"
 )
 
 const (
@@ -54,14 +55,17 @@ type TripleProtocol struct {
 func (tp *TripleProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
        url := invoker.GetURL()
        serviceKey := url.ServiceKey()
+       // todo: retrieve this info from url
+       info := &server.ServiceInfo{}
        exporter := NewTripleExporter(serviceKey, invoker, tp.ExporterMap())
        tp.SetExporterMap(serviceKey, exporter)
        logger.Infof("[TRIPLE Protocol] Export service: %s", url.String())
-       tp.openServer(url)
+       tp.openServer(invoker, info)
        return exporter
 }
 
-func (tp *TripleProtocol) openServer(url *common.URL) {
+func (tp *TripleProtocol) openServer(invoker protocol.Invoker, info 
*server.ServiceInfo) {
+       url := invoker.GetURL()
        tp.serverLock.Lock()
        defer tp.serverLock.Unlock()
 
@@ -69,23 +73,17 @@ func (tp *TripleProtocol) openServer(url *common.URL) {
                return
        }
 
-       // todo: remove this logic?
        if _, ok := tp.ExporterMap().Load(url.ServiceKey()); !ok {
-               panic("[GRPC_NEW Protocol]" + url.Key() + "is not existing")
+               panic("[TRIPLE Protocol]" + url.Key() + "is not existing")
        }
 
        srv := NewServer()
        tp.serverMap[url.Location] = srv
-       srv.Start(url)
+       srv.Start(invoker, info)
 }
 
 // Refer a remote triple service
 func (tp *TripleProtocol) Refer(url *common.URL) protocol.Invoker {
-       //client, err := NewClient(url)
-       //if err != nil {
-       //      logger.Warnf("can't dial the server: %s", url.Key())
-       //      return nil
-       //}
        invoker, err := NewTripleInvoker(url)
        if err != nil {
                logger.Warnf("can't dial the server: %s", url.Key())
@@ -97,7 +95,7 @@ func (tp *TripleProtocol) Refer(url *common.URL) 
protocol.Invoker {
 }
 
 func (tp *TripleProtocol) Destroy() {
-       logger.Infof("GrpcProtocol destroy.")
+       logger.Infof("TripleProtocol destroy.")
 
        tp.serverLock.Lock()
        defer tp.serverLock.Unlock()
diff --git a/protocol/triple/triple_protocol/handler.go 
b/protocol/triple/triple_protocol/handler.go
index 25f49504d..2593389e2 100644
--- a/protocol/triple/triple_protocol/handler.go
+++ b/protocol/triple/triple_protocol/handler.go
@@ -137,14 +137,14 @@ func NewServerStreamHandler(
                procedure,
                StreamTypeServer,
                func(ctx context.Context, conn StreamingHandlerConn) error {
-                       msg := reqInitFunc()
-                       if err := conn.Receive(&msg); err != nil {
+                       req := reqInitFunc()
+                       if err := conn.Receive(&req); err != nil {
                                return err
                        }
                        return implementation(
                                ctx,
                                &Request{
-                                       Msg:    &msg,
+                                       Msg:    req,
                                        spec:   conn.Spec(),
                                        peer:   conn.Peer(),
                                        header: conn.RequestHeader(),
diff --git a/registry/registry_config.go b/registry/registry_config.go
index 774b8167f..7ecac6325 100644
--- a/registry/registry_config.go
+++ b/registry/registry_config.go
@@ -83,7 +83,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) 
url.Values {
        urlMap.Set(constant.RegistryKey+"."+constant.RegistryZoneKey, c.Zone)
        urlMap.Set(constant.RegistryKey+"."+constant.WeightKey, 
strconv.FormatInt(c.Weight, 10))
        urlMap.Set(constant.RegistryTTLKey, c.TTL)
-       urlMap.Set(constant.ClientNameKey, commonCfg.ClientNameID(c, 
c.Protocol, c.Address))
+       urlMap.Set(constant.ClientNameKey, ClientNameID(c, c.Protocol, 
c.Address))
 
        for k, v := range c.Params {
                urlMap.Set(k, v)
@@ -111,7 +111,7 @@ func (c *RegistryConfig) toMetadataReportUrl() 
(*common.URL, error) {
                common.WithUsername(c.Username),
                common.WithPassword(c.Password),
                common.WithParamsValue(constant.TimeoutKey, c.Timeout),
-               common.WithParamsValue(constant.ClientNameKey, 
commonCfg.ClientNameID(c, c.Protocol, c.Address)),
+               common.WithParamsValue(constant.ClientNameKey, ClientNameID(c, 
c.Protocol, c.Address)),
                common.WithParamsValue(constant.MetadataReportGroupKey, 
c.Group),
                common.WithParamsValue(constant.MetadataReportNamespaceKey, 
c.Namespace),
        )
@@ -245,6 +245,11 @@ func LoadRegistries(registryIds []string, registries 
map[string]*RegistryConfig,
        return registryURLs
 }
 
+// ClientNameID unique identifier id for client
+func ClientNameID(config *RegistryConfig, protocol, address string) string {
+       return strings.Join([]string{config.Prefix(), protocol, address}, "-")
+}
+
 func (c *RegistryConfig) createNewURL(protocol string, address string, 
roleType common.RoleType) (*common.URL, error) {
        return common.NewURL(protocol+"://"+address,
                common.WithParams(c.getUrlMap(roleType)),
diff --git a/provider/options.go b/server/options.go
similarity index 75%
rename from provider/options.go
rename to server/options.go
index f762c8094..01c35d57f 100644
--- a/provider/options.go
+++ b/server/options.go
@@ -1,4 +1,4 @@
-package provider
+package server
 
 type Options struct {
 }
diff --git a/provider/provider.go b/server/server.go
similarity index 69%
rename from provider/provider.go
rename to server/server.go
index 72524179e..b860be890 100644
--- a/provider/provider.go
+++ b/server/server.go
@@ -1,14 +1,14 @@
-package provider
+package server
 
 import (
        "context"
 )
 
-type Provider struct {
+type Server struct {
 }
 
-// Provide assemble invoker chains like ProviderConfig.Load, init a service 
per call
-func (pro *Provider) Provide(handler interface{}, info *ServiceInfo, opts 
...Option) error {
+// Register assemble invoker chains like ProviderConfig.Load, init a service 
per call
+func (pro *Server) Register(handler interface{}, info *ServiceInfo, opts 
...Option) error {
        // put information from info to url
        // ProviderConfig.Load
 
@@ -33,6 +33,6 @@ type MethodInfo struct {
        Meta           map[string]interface{}
 }
 
-func NewProvider() (*Provider, error) {
+func NewServer() (*Server, error) {
        return nil, nil
 }

Reply via email to