This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 3de582d2c refactor(triple): upgrade triple client to service-level
abstraction (#3086)
3de582d2c is described below
commit 3de582d2ca5b474eba25537800d714dd843a1429
Author: Aether <[email protected]>
AuthorDate: Tue Jan 20 11:48:55 2026 +0800
refactor(triple): upgrade triple client to service-level abstraction (#3086)
* refactor(triple): upgrade Client to service-level abstraction
---
protocol/triple/client.go | 95 +++------------
protocol/triple/client_test.go | 129 +++------------------
protocol/triple/triple_invoker_test.go | 36 +++---
protocol/triple/triple_protocol/client.go | 100 +++++++++++++---
.../triple/triple_protocol/compression_test.go | 6 +-
.../triple/triple_protocol/duplex_http_call.go | 5 +
.../connect/ping/v1/pingv1connect/ping.connect.go | 46 ++------
7 files changed, 152 insertions(+), 265 deletions(-)
diff --git a/protocol/triple/client.go b/protocol/triple/client.go
index 2dab272d2..a0e7c124a 100644
--- a/protocol/triple/client.go
+++ b/protocol/triple/client.go
@@ -24,7 +24,6 @@ import (
"fmt"
"net"
"net/http"
- "reflect"
"strings"
"time"
)
@@ -57,41 +56,24 @@ const (
// callUnary, callClientStream, callServerStream, callBidiStream.
// A Reference has a clientManager.
type clientManager struct {
- isIDL bool
- // triple_protocol clients, key is method name
- triClients map[string]*tri.Client
+ isIDL bool
+ triClient *tri.Client
}
// TODO: code a triple client between clientManager and triple_protocol client
// TODO: write a NewClient for triple client
-func (cm *clientManager) getClient(method string) (*tri.Client, error) {
- triClient, ok := cm.triClients[method]
- if !ok {
- return nil, fmt.Errorf("missing triple client for method: %s",
method)
- }
- return triClient, nil
-}
-
func (cm *clientManager) callUnary(ctx context.Context, method string, req,
resp any) error {
- triClient, err := cm.getClient(method)
- if err != nil {
- return err
- }
triReq := tri.NewRequest(req)
triResp := tri.NewResponse(resp)
- if err := triClient.CallUnary(ctx, triReq, triResp); err != nil {
+ if err := cm.triClient.CallUnary(ctx, triReq, method, triResp); err !=
nil {
return err
}
return nil
}
func (cm *clientManager) callClientStream(ctx context.Context, method string)
(any, error) {
- triClient, err := cm.getClient(method)
- if err != nil {
- return nil, err
- }
- stream, err := triClient.CallClientStream(ctx)
+ stream, err := cm.triClient.CallClientStream(ctx, method)
if err != nil {
return nil, err
}
@@ -99,12 +81,8 @@ func (cm *clientManager) callClientStream(ctx
context.Context, method string) (a
}
func (cm *clientManager) callServerStream(ctx context.Context, method string,
req any) (any, error) {
- triClient, err := cm.getClient(method)
- if err != nil {
- return nil, err
- }
triReq := tri.NewRequest(req)
- stream, err := triClient.CallServerStream(ctx, triReq)
+ stream, err := cm.triClient.CallServerStream(ctx, triReq, method)
if err != nil {
return nil, err
}
@@ -112,11 +90,7 @@ func (cm *clientManager) callServerStream(ctx
context.Context, method string, re
}
func (cm *clientManager) callBidiStream(ctx context.Context, method string)
(any, error) {
- triClient, err := cm.getClient(method)
- if err != nil {
- return nil, err
- }
- stream, err := triClient.CallBidiStream(ctx)
+ stream, err := cm.triClient.CallBidiStream(ctx, method)
if err != nil {
return nil, err
}
@@ -282,59 +256,16 @@ func newClientManager(url *common.URL) (*clientManager,
error) {
baseTriURL = httpPrefix + baseTriURL
}
- triClients := make(map[string]*tri.Client)
-
- // Check if this is a generic call - for generic call, we only need
$invoke method
- generic := url.GetParam(constant.GenericKey, "")
- isGeneric := isGenericCall(generic)
-
- if isGeneric {
- // For generic call, only register $invoke method
- invokeURL, err := joinPath(baseTriURL, url.Interface(),
constant.Generic)
- if err != nil {
- return nil, fmt.Errorf("JoinPath failed for base %s,
interface %s, method %s", baseTriURL, url.Interface(), constant.Generic)
- }
- triClients[constant.Generic] = tri.NewClient(httpClient,
invokeURL, cliOpts...)
- } else if len(url.Methods) != 0 {
- for _, method := range url.Methods {
- triURL, err := joinPath(baseTriURL, url.Interface(),
method)
- if err != nil {
- return nil, fmt.Errorf("JoinPath failed for
base %s, interface %s, method %s", baseTriURL, url.Interface(), method)
- }
- triClient := tri.NewClient(httpClient, triURL,
cliOpts...)
- triClients[method] = triClient
- }
- } else {
- // This branch is for the non-IDL mode, where we pass in the
service solely
- // for the purpose of using reflection to obtain all methods of
the service.
- // There might be potential for optimization in this area later
on.
- service, ok := url.GetAttribute(constant.RpcServiceKey)
- if !ok {
- return nil, fmt.Errorf("triple clientmanager can't get
methods")
- }
-
- serviceType := reflect.TypeOf(service)
- for i := range serviceType.NumMethod() {
- methodName := serviceType.Method(i).Name
- triURL, err := joinPath(baseTriURL, url.Interface(),
methodName)
- if err != nil {
- return nil, fmt.Errorf("JoinPath failed for
base %s, interface %s, method %s", baseTriURL, url.Interface(), methodName)
- }
- triClient := tri.NewClient(httpClient, triURL,
cliOpts...)
- triClients[methodName] = triClient
- }
-
- // Register $invoke method for generic call support in non-IDL
mode
- invokeURL, err := joinPath(baseTriURL, url.Interface(),
constant.Generic)
- if err != nil {
- return nil, fmt.Errorf("JoinPath failed for base %s,
interface %s, method %s", baseTriURL, url.Interface(), constant.Generic)
- }
- triClients[constant.Generic] = tri.NewClient(httpClient,
invokeURL, cliOpts...)
+ triURL, err := joinPath(baseTriURL, url.Interface())
+ if err != nil {
+ return nil, fmt.Errorf("JoinPath failed for base %s, interface
%s", baseTriURL, url.Interface())
}
+ triClient := tri.NewClient(httpClient, triURL, cliOpts...)
+
return &clientManager{
- isIDL: isIDL,
- triClients: triClients,
+ isIDL: isIDL,
+ triClient: triClient,
}, nil
}
diff --git a/protocol/triple/client_test.go b/protocol/triple/client_test.go
index 87fe23db0..522958b71 100644
--- a/protocol/triple/client_test.go
+++ b/protocol/triple/client_test.go
@@ -74,12 +74,7 @@ func TestClientManager_HTTP2AndHTTP3(t *testing.T) {
// If successfully created, verify the client manager
assert.NotNil(t, clientManager)
assert.True(t, clientManager.isIDL)
- assert.NotEmpty(t, clientManager.triClients)
-
- // Verify that the client for the specific method exists
- client, exists := clientManager.triClients["testMethod"]
- assert.True(t, exists)
- assert.NotNil(t, client)
+ assert.NotNil(t, clientManager.triClient)
}
func TestDualTransport(t *testing.T) {
@@ -98,104 +93,18 @@ func TestDualTransport(t *testing.T) {
assert.True(t, ok, "transport should implement http.RoundTripper")
}
-func TestClientManager_GetClient(t *testing.T) {
- tests := []struct {
- desc string
- cm *clientManager
- method string
- expectErr bool
- }{
- {
- desc: "method exists",
- cm: &clientManager{
- triClients: map[string]*tri.Client{
- "TestMethod":
tri.NewClient(&http.Client{}, "http://localhost:8080/test"),
- },
- },
- method: "TestMethod",
- expectErr: false,
- },
- {
- desc: "method not exists",
- cm: &clientManager{
- triClients: map[string]*tri.Client{
- "TestMethod":
tri.NewClient(&http.Client{}, "http://localhost:8080/test"),
- },
- },
- method: "NonExistMethod",
- expectErr: true,
- },
- {
- desc: "empty triClients",
- cm: &clientManager{
- triClients: map[string]*tri.Client{},
- },
- method: "AnyMethod",
- expectErr: true,
- },
- }
-
- for _, test := range tests {
- t.Run(test.desc, func(t *testing.T) {
- client, err := test.cm.getClient(test.method)
- if test.expectErr {
- require.Error(t, err)
- assert.Nil(t, client)
- assert.Contains(t, err.Error(), "missing triple
client")
- } else {
- require.NoError(t, err)
- assert.NotNil(t, client)
- }
- })
- }
-}
-
func TestClientManager_Close(t *testing.T) {
cm := &clientManager{
- isIDL: true,
- triClients: map[string]*tri.Client{
- "Method1": tri.NewClient(&http.Client{},
"http://localhost:8080/test1"),
- "Method2": tri.NewClient(&http.Client{},
"http://localhost:8080/test2"),
- },
+ isIDL: true,
+ triClient: tri.NewClient(&http.Client{},
"http://localhost:8080/test"),
}
err := cm.close()
require.NoError(t, err)
}
-func TestClientManager_CallMethods_MissingClient(t *testing.T) {
- cm := &clientManager{
- triClients: map[string]*tri.Client{},
- }
- ctx := context.Background()
-
- t.Run("callUnary missing client", func(t *testing.T) {
- err := cm.callUnary(ctx, "NonExist", nil, nil)
- require.Error(t, err)
- assert.Contains(t, err.Error(), "missing triple client")
- })
-
- t.Run("callClientStream missing client", func(t *testing.T) {
- stream, err := cm.callClientStream(ctx, "NonExist")
- require.Error(t, err)
- assert.Nil(t, stream)
- assert.Contains(t, err.Error(), "missing triple client")
- })
-
- t.Run("callServerStream missing client", func(t *testing.T) {
- stream, err := cm.callServerStream(ctx, "NonExist", nil)
- require.Error(t, err)
- assert.Nil(t, stream)
- assert.Contains(t, err.Error(), "missing triple client")
- })
-
- t.Run("callBidiStream missing client", func(t *testing.T) {
- stream, err := cm.callBidiStream(ctx, "NonExist")
- require.Error(t, err)
- assert.Nil(t, stream)
- assert.Contains(t, err.Error(), "missing triple client")
- })
-}
+// TestClientManager_CallMethods_MissingClient removed - no longer applicable
+// in the service-level client architecture where all methods share a single
triClient.
func Test_genKeepAliveOptions(t *testing.T) {
defaultInterval, _ :=
time.ParseDuration(constant.DefaultKeepAliveInterval)
@@ -359,16 +268,17 @@ func Test_newClientManager_Serialization(t *testing.T) {
}
func Test_newClientManager_NoMethods(t *testing.T) {
- // Test when url has no methods and no RpcServiceKey attribute
+ // Test when url has no methods - in service-level client architecture,
+ // this is valid as the client is created at service level, not method
level
url := common.NewURLWithOptions(
common.WithLocation("localhost:20000"),
common.WithPath("com.example.TestService"),
)
cm, err := newClientManager(url)
- require.Error(t, err)
- assert.Nil(t, cm)
- assert.Contains(t, err.Error(), "can't get methods")
+ require.NoError(t, err, "service-level client should be created even
without method list")
+ assert.NotNil(t, cm)
+ assert.NotNil(t, cm.triClient, "triClient should be created at service
level")
}
func Test_newClientManager_WithMethods(t *testing.T) {
@@ -381,10 +291,7 @@ func Test_newClientManager_WithMethods(t *testing.T) {
cm, err := newClientManager(url)
require.NoError(t, err)
assert.NotNil(t, cm)
- assert.Len(t, cm.triClients, 3)
- assert.Contains(t, cm.triClients, "Method1")
- assert.Contains(t, cm.triClients, "Method2")
- assert.Contains(t, cm.triClients, "Method3")
+ assert.NotNil(t, cm.triClient, "triClient should be created")
}
func Test_newClientManager_WithGroupAndVersion(t *testing.T) {
@@ -475,8 +382,8 @@ func Test_newClientManager_WithRpcService(t *testing.T) {
cm, err := newClientManager(url)
require.NoError(t, err)
assert.NotNil(t, cm)
- // Should have methods from mockService (Reference, TestMethod1,
TestMethod2)
- assert.GreaterOrEqual(t, len(cm.triClients), 2)
+ // In service-level client architecture, a single triClient is created
+ assert.NotNil(t, cm.triClient, "triClient should be created for non-IDL
mode")
}
func TestDualTransport_Structure(t *testing.T) {
@@ -552,7 +459,7 @@ func Test_newClientManager_URLPrefixHandling(t *testing.T) {
cm, err := newClientManager(url)
require.NoError(t, err)
assert.NotNil(t, cm)
- assert.Len(t, cm.triClients, 1)
+ assert.NotNil(t, cm.triClient, "triClient should be
created")
})
}
}
@@ -635,12 +542,8 @@ func Test_newClientManager_MultipleMethods(t *testing.T) {
cm, err := newClientManager(url)
require.NoError(t, err)
assert.NotNil(t, cm)
- assert.Len(t, cm.triClients, len(methods))
-
- for _, method := range methods {
- _, exists := cm.triClients[method]
- assert.True(t, exists, "method %s should exist", method)
- }
+ // In service-level client architecture, a single triClient handles all
methods
+ assert.NotNil(t, cm.triClient, "triClient should be created to handle
all methods")
}
func Test_newClientManager_InterfaceName(t *testing.T) {
diff --git a/protocol/triple/triple_invoker_test.go
b/protocol/triple/triple_invoker_test.go
index 2f5a02004..7d7a2e30e 100644
--- a/protocol/triple/triple_invoker_test.go
+++ b/protocol/triple/triple_invoker_test.go
@@ -197,8 +197,8 @@ func TestTripleInvoker_SetGetClientManager(t *testing.T) {
// set clientManager
cm := &clientManager{
- isIDL: true,
- triClients: make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
}
ti.setClientManager(cm)
assert.Equal(t, cm, ti.getClientManager())
@@ -222,8 +222,8 @@ func TestTripleInvoker_IsAvailable(t *testing.T) {
{
desc: "clientManager is not nil",
clientManager: &clientManager{
- isIDL: true,
- triClients: make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
},
expect: true,
},
@@ -254,8 +254,8 @@ func TestTripleInvoker_IsDestroyed(t *testing.T) {
{
desc: "clientManager is not nil and not destroyed",
clientManager: &clientManager{
- isIDL: true,
- triClients: make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
},
destroyed: false,
expect: false,
@@ -275,8 +275,8 @@ func TestTripleInvoker_Destroy(t *testing.T) {
t.Run("destroy with clientManager", func(t *testing.T) {
url := common.NewURLWithOptions()
cm := &clientManager{
- isIDL: true,
- triClients: make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
}
ti := newTestTripleInvoker(url, cm)
@@ -302,8 +302,8 @@ func TestTripleInvoker_Destroy(t *testing.T) {
t.Run("destroy called multiple times", func(t *testing.T) {
url := common.NewURLWithOptions()
cm := &clientManager{
- isIDL: true,
- triClients: make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
}
ti := newTestTripleInvoker(url, cm)
@@ -329,8 +329,8 @@ func TestTripleInvoker_Invoke(t *testing.T) {
setup: func() (*TripleInvoker, base.Invocation) {
url := common.NewURLWithOptions()
ti := newTestTripleInvoker(url, &clientManager{
- isIDL: true,
- triClients:
make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
})
ti.Destroy()
inv := invocation.NewRPCInvocationWithOptions()
@@ -353,8 +353,8 @@ func TestTripleInvoker_Invoke(t *testing.T) {
setup: func() (*TripleInvoker, base.Invocation) {
url := common.NewURLWithOptions()
ti := newTestTripleInvoker(url, &clientManager{
- isIDL: true,
- triClients:
make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
})
inv := invocation.NewRPCInvocationWithOptions()
return ti, inv
@@ -366,8 +366,8 @@ func TestTripleInvoker_Invoke(t *testing.T) {
setup: func() (*TripleInvoker, base.Invocation) {
url := common.NewURLWithOptions()
ti := newTestTripleInvoker(url, &clientManager{
- isIDL: true,
- triClients:
make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
})
inv := invocation.NewRPCInvocationWithOptions(
invocation.WithMethodName("TestMethod"),
@@ -398,8 +398,8 @@ func TestTripleInvoker_Invoke(t *testing.T) {
func TestTripleInvoker_Invoke_Concurrent(t *testing.T) {
url := common.NewURLWithOptions()
ti := newTestTripleInvoker(url, &clientManager{
- isIDL: true,
- triClients: make(map[string]*tri.Client),
+ isIDL: true,
+ triClient: nil,
})
var wg sync.WaitGroup
diff --git a/protocol/triple/triple_protocol/client.go
b/protocol/triple/triple_protocol/client.go
index 90cf4f025..0d570a54b 100644
--- a/protocol/triple/triple_protocol/client.go
+++ b/protocol/triple/triple_protocol/client.go
@@ -27,7 +27,7 @@ import (
type TimeoutKey struct{}
-// Client is a reusable, concurrency-safe client for a single procedure.
+// Client is a reusable, concurrency-safe client for a service.
// Depending on the procedure's type, use the CallUnary, CallClientStream,
// CallServerStream, or CallBidiStream method.
//
@@ -36,7 +36,7 @@ type TimeoutKey struct{}
// use the [WithTriple] options.
type Client struct {
config *clientConfig
- callUnary func(context.Context, *Request, *Response) error
+ callUnary func(context.Context, *Request, string, *Response) error
protocolClient protocolClient
err error
}
@@ -75,9 +75,13 @@ func NewClient(httpClient HTTPClient, url string, options
...ClientOption) *Clie
client.protocolClient = protocolCli
// Rather than applying unary interceptors along the hot path, we can
do it
// once at client creation.
+ //
+ // Note: unarySpec is captured by the closure below but is never
modified.
+ // Each call to callUnary creates a new methodLevelSpec by copying
unarySpec,
+ // ensuring thread-safety for concurrent calls.
unarySpec := config.newSpec(StreamTypeUnary)
unaryFunc := UnaryFunc(func(ctx context.Context, request AnyRequest,
response AnyResponse) error {
- conn := client.protocolClient.NewConn(ctx, unarySpec,
request.Header())
+ conn := client.protocolClient.NewConn(ctx, request.Spec(),
request.Header())
// Send always returns an io.EOF unless the error is from the
client-side.
// We want the user to continue to call Receive in those cases
to get the
// full error from the server-side.
@@ -101,11 +105,15 @@ func NewClient(httpClient HTTPClient, url string, options
...ClientOption) *Clie
if interceptor := config.Interceptor; interceptor != nil {
unaryFunc = interceptor.WrapUnary(unaryFunc)
}
- client.callUnary = func(ctx context.Context, request *Request, response
*Response) error {
+ client.callUnary = func(ctx context.Context, request *Request, method
string, response *Response) error {
// To make the specification, peer, and RPC headers visible to
the full
// interceptor chain (as though they were supplied by the
caller), we'll
// add them here.
- request.spec = unarySpec
+ methodLevelSpec, buildErr :=
buildMethodLevelReqSpec(&unarySpec, method)
+ if buildErr != nil {
+ return buildErr
+ }
+ request.spec = methodLevelSpec
request.peer = client.protocolClient.Peer()
protocolCli.WriteRequestHeader(StreamTypeUnary,
request.Header())
if err := unaryFunc(ctx, request, response); err != nil {
@@ -118,7 +126,7 @@ func NewClient(httpClient HTTPClient, url string, options
...ClientOption) *Clie
}
// CallUnary calls a request-response procedure.
-func (c *Client) CallUnary(ctx context.Context, request *Request, response
*Response) error {
+func (c *Client) CallUnary(ctx context.Context, request *Request, method
string, response *Response) error {
if c.err != nil {
return c.err
}
@@ -128,23 +136,30 @@ func (c *Client) CallUnary(ctx context.Context, request
*Request, response *Resp
}
mergeHeaders(request.Header(), ExtractFromOutgoingContext(ctx))
applyGroupVersionHeaders(request.Header(), c.config)
- return c.callUnary(ctx, request, response)
+ return c.callUnary(ctx, request, method, response)
}
// CallClientStream calls a client streaming procedure.
-func (c *Client) CallClientStream(ctx context.Context)
(*ClientStreamForClient, error) {
+func (c *Client) CallClientStream(ctx context.Context, method string)
(*ClientStreamForClient, error) {
if c.err != nil {
return &ClientStreamForClient{err: c.err}, c.err
}
- return &ClientStreamForClient{conn: c.newConn(ctx, StreamTypeClient)},
nil
+ conn, err := c.newConn(ctx, StreamTypeClient, method)
+ if err != nil {
+ return &ClientStreamForClient{err: err}, err
+ }
+ return &ClientStreamForClient{conn: conn}, nil
}
// CallServerStream calls a server streaming procedure.
-func (c *Client) CallServerStream(ctx context.Context, request *Request)
(*ServerStreamForClient, error) {
+func (c *Client) CallServerStream(ctx context.Context, request *Request,
method string) (*ServerStreamForClient, error) {
if c.err != nil {
return nil, c.err
}
- conn := c.newConn(ctx, StreamTypeServer)
+ conn, err := c.newConn(ctx, StreamTypeServer, method)
+ if err != nil {
+ return nil, err
+ }
request.spec = conn.Spec()
request.peer = conn.Peer()
mergeHeaders(conn.RequestHeader(), request.header)
@@ -163,14 +178,23 @@ func (c *Client) CallServerStream(ctx context.Context,
request *Request) (*Serve
}
// CallBidiStream calls a bidirectional streaming procedure.
-func (c *Client) CallBidiStream(ctx context.Context) (*BidiStreamForClient,
error) {
+func (c *Client) CallBidiStream(ctx context.Context, method string)
(*BidiStreamForClient, error) {
if c.err != nil {
return &BidiStreamForClient{err: c.err}, c.err
}
- return &BidiStreamForClient{conn: c.newConn(ctx, StreamTypeBidi)}, nil
+ conn, err := c.newConn(ctx, StreamTypeBidi, method)
+ if err != nil {
+ return &BidiStreamForClient{err: err}, err
+ }
+ return &BidiStreamForClient{conn: conn}, nil
}
-func (c *Client) newConn(ctx context.Context, streamType StreamType)
StreamingClientConn {
+func (c *Client) newConn(ctx context.Context, streamType StreamType, method
string) (StreamingClientConn, error) {
+ serviceLevelSpec := c.config.newSpec(streamType)
+ methodLevelSpec, buildErr := buildMethodLevelReqSpec(&serviceLevelSpec,
method)
+ if buildErr != nil {
+ return nil, buildErr
+ }
newConn := func(ctx context.Context, spec Spec) StreamingClientConn {
header := make(http.Header, 8) // arbitrary power of two,
prevent immediate resizing
mergeHeaders(header, ExtractFromOutgoingContext(ctx))
@@ -181,7 +205,7 @@ func (c *Client) newConn(ctx context.Context, streamType
StreamType) StreamingCl
if interceptor := c.config.Interceptor; interceptor != nil {
newConn = interceptor.WrapStreamingClient(newConn)
}
- return newConn(ctx, c.config.newSpec(streamType))
+ return newConn(ctx, methodLevelSpec), nil
}
type clientConfig struct {
@@ -210,7 +234,7 @@ func newClientConfig(rawURL string, options []ClientOption)
(*clientConfig, *Err
if err != nil {
return nil, err
}
- protoPath := extractProtoPath(url.Path)
+ protoPath := normalizeClientProcedure(url.Path)
config := clientConfig{
URL: url,
// use gRPC by default
@@ -311,3 +335,47 @@ func applyGroupVersionHeaders(header http.Header, cfg
*clientConfig) {
header.Set(tripleServiceVersion, cfg.Version)
}
}
+
+func buildMethodLevelReqSpec(serviceLevelReqSpec *Spec, method string) (Spec,
error) {
+ if serviceLevelReqSpec == nil {
+ return Spec{}, fmt.Errorf("cannot build method-level spec:
service-level spec is nil")
+ }
+ methodLevelSpec := *serviceLevelReqSpec
+
+ methodLevelURL, err := url.JoinPath(methodLevelSpec.Procedure, method)
+ if err != nil {
+ return Spec{}, fmt.Errorf("JoinPath failed for procedure %s,
method %s", methodLevelSpec.Procedure, method)
+ }
+
+ methodLevelSpec.Procedure = methodLevelURL
+ return methodLevelSpec, nil
+}
+
+// normalizeClientProcedure ensures the path starts with "/" and has no
trailing slash.
+//
+// Unlike extractProtoPath (used on the handler side), this function preserves
the full path
+// rather than extracting only the last two segments. This allows the client
to support
+// multi-level URL prefixes (e.g., "/api/v1/com.example.Service") while the
method name
+// will be appended later via buildMethodLevelReqSpec.
+func normalizeClientProcedure(path string) string {
+ // Handle empty string or a single slash early
+ if path == "" || path == "/" {
+ return "/"
+ }
+
+ // Pre-clean: strip all trailing slashes (handles cases like "path///")
+ path = strings.TrimRight(path, "/")
+
+ // Ensure the path has a leading slash
+ if !strings.HasPrefix(path, "/") {
+ path = "/" + path
+ }
+
+ // If the path becomes empty after trimming (meaning it was all
slashes),
+ // reset it to a single slash.
+ if path == "" {
+ return "/"
+ }
+
+ return path
+}
diff --git a/protocol/triple/triple_protocol/compression_test.go
b/protocol/triple/triple_protocol/compression_test.go
index 0d5da5d40..cf2d16567 100644
--- a/protocol/triple/triple_protocol/compression_test.go
+++ b/protocol/triple/triple_protocol/compression_test.go
@@ -29,6 +29,10 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/assert"
)
+const (
+ testAcceptEncodingOrdering = "AcceptEncodingOrdering"
+)
+
func TestAcceptEncodingOrdering(t *testing.T) {
t.Parallel()
const (
@@ -57,7 +61,7 @@ func TestAcceptEncodingOrdering(t *testing.T) {
withFakeBrotli,
withGzip(),
)
- _ = client.CallUnary(context.Background(),
NewRequest(&emptypb.Empty{}), NewResponse(&emptypb.Empty{}))
+ _ = client.CallUnary(context.Background(),
NewRequest(&emptypb.Empty{}), testAcceptEncodingOrdering,
NewResponse(&emptypb.Empty{}))
assert.True(t, called)
}
diff --git a/protocol/triple/triple_protocol/duplex_http_call.go
b/protocol/triple/triple_protocol/duplex_http_call.go
index 6c4c02176..ef5fefc33 100644
--- a/protocol/triple/triple_protocol/duplex_http_call.go
+++ b/protocol/triple/triple_protocol/duplex_http_call.go
@@ -61,6 +61,11 @@ func newDuplexHTTPCall(
// Request. This ensures if a transport out of our control wants
// to mutate the req.URL, we don't feel the effects of it.
url = cloneURL(url)
+ // Bind the concrete RPC path to the request URL; without this the
request
+ // lands on the service base path and handlers return 404.
+ url.Path = spec.Procedure
+ // RawPath must align with Path; clearing avoids stale values.
+ url.RawPath = ""
pipeReader, pipeWriter := io.Pipe()
// todo(DMwangnima): remove cloneURL logic in WithContext
diff --git
a/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1/pingv1connect/ping.connect.go
b/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1/pingv1connect/ping.connect.go
index ebe983a5c..af13ed96b 100644
---
a/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1/pingv1connect/ping.connect.go
+++
b/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1/pingv1connect/ping.connect.go
@@ -16,6 +16,7 @@
//
https://github.com/bufbuild/connect-go/blob/main/internal/proto/connect/ping/v1/ping.proto.
// Code generated by protoc-gen-connect-go. DO NOT EDIT.
+// Modified manually to use service-level client for dubbo-go compatibility.
//
// Source: connect/ping/v1/ping.proto
@@ -28,10 +29,9 @@ import (
errors "errors"
http "net/http"
strings "strings"
-)
-import (
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
+
pingv1
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1"
)
@@ -83,31 +83,11 @@ type PingServiceClient interface {
// http://api.acme.com or https://acme.com/grpc).
func NewPingServiceClient(httpClient triple_protocol.HTTPClient, baseURL
string, opts ...triple_protocol.ClientOption) PingServiceClient {
baseURL = strings.TrimRight(baseURL, "/")
+ serviceURL := baseURL + "/" + PingServiceName
return &pingServiceClient{
- ping: triple_protocol.NewClient(
- httpClient,
- baseURL+PingServicePingProcedure,
-
triple_protocol.WithIdempotency(triple_protocol.IdempotencyNoSideEffects),
- triple_protocol.WithClientOptions(opts...),
- ),
- fail: triple_protocol.NewClient(
- httpClient,
- baseURL+PingServiceFailProcedure,
- opts...,
- ),
- sum: triple_protocol.NewClient(
- httpClient,
- baseURL+PingServiceSumProcedure,
- opts...,
- ),
- countUp: triple_protocol.NewClient(
- httpClient,
- baseURL+PingServiceCountUpProcedure,
- opts...,
- ),
- cumSum: triple_protocol.NewClient(
+ client: triple_protocol.NewClient(
httpClient,
- baseURL+PingServiceCumSumProcedure,
+ serviceURL,
opts...,
),
}
@@ -115,37 +95,33 @@ func NewPingServiceClient(httpClient
triple_protocol.HTTPClient, baseURL string,
// pingServiceClient implements PingServiceClient.
type pingServiceClient struct {
- ping *triple_protocol.Client
- fail *triple_protocol.Client
- sum *triple_protocol.Client
- countUp *triple_protocol.Client
- cumSum *triple_protocol.Client
+ client *triple_protocol.Client
}
// Ping calls connect.ping.v1.PingService.Ping.
// Ping(context.Context, *pingv1.PingRequest,*pingv1.PingResponse) (error)
func (c *pingServiceClient) Ping(ctx context.Context, req
*triple_protocol.Request, res *triple_protocol.Response) error {
- return c.ping.CallUnary(ctx, req, res)
+ return c.client.CallUnary(ctx, req, "Ping", res)
}
// Fail calls connect.ping.v1.PingService.Fail.
func (c *pingServiceClient) Fail(ctx context.Context, req
*triple_protocol.Request, res *triple_protocol.Response) error {
- return c.fail.CallUnary(ctx, req, res)
+ return c.client.CallUnary(ctx, req, "Fail", res)
}
// Sum calls connect.ping.v1.PingService.Sum.
func (c *pingServiceClient) Sum(ctx context.Context)
(*triple_protocol.ClientStreamForClient, error) {
- return c.sum.CallClientStream(ctx)
+ return c.client.CallClientStream(ctx, "Sum")
}
// CountUp calls connect.ping.v1.PingService.CountUp.
func (c *pingServiceClient) CountUp(ctx context.Context, req
*triple_protocol.Request) (*triple_protocol.ServerStreamForClient, error) {
- return c.countUp.CallServerStream(ctx, req)
+ return c.client.CallServerStream(ctx, req, "CountUp")
}
// CumSum calls connect.ping.v1.PingService.CumSum.
func (c *pingServiceClient) CumSum(ctx context.Context)
(*triple_protocol.BidiStreamForClient, error) {
- return c.cumSum.CallBidiStream(ctx)
+ return c.client.CallBidiStream(ctx, "CumSum")
}
// PingServiceHandler is an implementation of the connect.ping.v1.PingService
service.