This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 3040a9227 fix: client can not get attach attachments from server
(#2854)
3040a9227 is described below
commit 3040a9227bc08b9fb1201ff71a6adf6fdf6b72fe
Author: 1kasa <[email protected]>
AuthorDate: Mon Apr 28 20:13:58 2025 +0800
fix: client can not get attach attachments from server (#2854)
* fix: client can not get attach attachments from server
* fix: ci error(test lack)
* fix: ci error(nil)
---
common/constant/default.go | 2 +-
common/constant/key.go | 7 +++
config/service_config_test.go | 2 +-
filter/context/filter.go | 137 ++++++++++++++++++++++++++++++++++++++++++
filter/filter_impl/import.go | 1 +
imports/imports.go | 1 +
protocol/triple/client.go | 26 ++++++++
protocol/triple/server.go | 24 ++++++--
8 files changed, 194 insertions(+), 6 deletions(-)
diff --git a/common/constant/default.go b/common/constant/default.go
index edd08301a..2b8769d84 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -62,7 +62,7 @@ const (
// that put the AdaptiveServiceProviderFilterKey at the end.
DefaultServiceFilters = EchoFilterKey + "," +
TokenFilterKey + "," + AccessLogFilterKey + "," +
TpsLimitFilterKey + "," +
- GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," +
GracefulShutdownProviderFilterKey
+ GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," +
GracefulShutdownProviderFilterKey + "," + ContextFilterKey
DefaultReferenceFilters = GracefulShutdownConsumerFilterKey
)
diff --git a/common/constant/key.go b/common/constant/key.go
index fca26aef0..8a357d426 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -105,6 +105,7 @@ const (
XdsCircuitBreakerKey = "xds_circuit_reaker"
OTELServerTraceKey = "otelServerTrace"
OTELClientTraceKey = "otelClientTrace"
+ ContextFilterKey = "context"
)
const (
@@ -321,6 +322,7 @@ const (
Tagkey = "dubbo.tag" // key of tag
ConditionKey = "dubbo.condition"
AttachmentKey = DubboCtxKey("attachment") // key in
context in invoker
+ AttachmentServerKey = DubboCtxKey("server-attachment")
TagRouterFactoryKey = "tag"
AffinityAppRouterFactoryKey = "application.affinity"
AffinityServiceRouterFactoryKey = "service.affinity"
@@ -475,3 +477,8 @@ const (
MinNacosWeight = 0.0 // Minimum allowed weight (Nacos range
starts at 0)
MaxNacosWeight = 10000.0 // Maximum allowed weight (Nacos range
ends at 10000)
)
+
+const (
+ GrpcHeaderStatus = "Grpc-Status"
+ GrpcHeaderMessage = "Grpc-Message"
+)
diff --git a/config/service_config_test.go b/config/service_config_test.go
index 3136b8fc9..ec7f4af48 100644
--- a/config/service_config_test.go
+++ b/config/service_config_test.go
@@ -114,7 +114,7 @@ func TestNewServiceConfigBuilder(t *testing.T) {
values := serviceConfig.getUrlMap()
assert.Equal(t, values.Get("methods.Say.weight"), "0")
assert.Equal(t, values.Get("methods.Say.tps.limit.rate"), "")
- assert.Equal(t, values.Get(constant.ServiceFilterKey),
"echo,token,accesslog,tps,generic_service,execute,pshutdown")
+ assert.Equal(t, values.Get(constant.ServiceFilterKey),
"echo,token,accesslog,tps,generic_service,execute,pshutdown,context")
})
t.Run("Implement", func(t *testing.T) {
diff --git a/filter/context/filter.go b/filter/context/filter.go
new file mode 100644
index 000000000..d3eaf02d7
--- /dev/null
+++ b/filter/context/filter.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package context
+
+import (
+ "context"
+ "strings"
+ "sync"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/filter"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+var (
+ once sync.Once
+ ctx *contextFilter
+)
+
+var unloadingKeys = make(map[string]struct{})
+
+func init() {
+ unloadingKeys[constant.PathKey] = struct{}{}
+ unloadingKeys[constant.InterfaceKey] = struct{}{}
+ unloadingKeys[constant.GroupKey] = struct{}{}
+ unloadingKeys[constant.VersionKey] = struct{}{}
+ unloadingKeys[constant.TokenKey] = struct{}{}
+ unloadingKeys[constant.TimeoutKey] = struct{}{}
+
+ unloadingKeys[constant.AsyncKey] = struct{}{}
+ unloadingKeys[constant.TagKey] = struct{}{}
+ unloadingKeys[constant.ForceUseTag] = struct{}{}
+
+ httpHeaders := []string{
+ "accept",
+ "accept-charset",
+ "accept-datetime",
+ "accept-encoding",
+ "accept-language",
+ "access-control-request-headers",
+ "access-control-request-method",
+ "authorization",
+ "cache-control",
+ "connection",
+ "content-length",
+ "content-md5",
+ "content-type",
+ "cookie",
+ "date",
+ "dnt",
+ "expect",
+ "forwarded",
+ "from",
+ "host",
+ "http2-settings",
+ "if-match",
+ "if-modified-since",
+ "if-none-match",
+ "if-range",
+ "if-unmodified-since",
+ "max-forwards",
+ "origin",
+ "pragma",
+ "proxy-authorization",
+ "range",
+ "referer",
+ "sec-fetch-dest",
+ "sec-fetch-mode",
+ "sec-fetch-site",
+ "sec-fetch-user",
+ "te",
+ "trailer",
+ "upgrade",
+ "upgrade-insecure-requests",
+ "user-agent",
+ }
+ for _, header := range httpHeaders {
+ unloadingKeys[header] = struct{}{}
+ }
+}
+
+func init() {
+ extension.SetFilter(constant.ContextFilterKey, newContextFilter)
+}
+
+type contextFilter struct{}
+
+func newContextFilter() filter.Filter {
+ if ctx == nil {
+ once.Do(func() {
+ ctx = &contextFilter{}
+ })
+ }
+ return ctx
+}
+
+// Invoke do nothing
+func (f *contextFilter) Invoke(ctx context.Context, invoker protocol.Invoker,
invocation protocol.Invocation) protocol.Result {
+ return invoker.Invoke(ctx, invocation)
+}
+
+// OnResponse pass attachments to result
+func (f *contextFilter) OnResponse(ctx context.Context, result
protocol.Result, _ protocol.Invoker,
+ _ protocol.Invocation) protocol.Result {
+ attachmentsRaw := ctx.Value(constant.AttachmentServerKey)
+ if attachmentsRaw != nil {
+ if attachments, ok := attachmentsRaw.(map[string]any); ok {
+ filtered := make(map[string]any)
+ for key, value := range attachments {
+ if _, exists :=
unloadingKeys[strings.ToLower(key)]; !exists {
+ filtered[key] = value
+ }
+ }
+ result.SetAttachments(filtered)
+ }
+ }
+
+ return result
+}
diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go
index 2e389b90a..277b5f56e 100644
--- a/filter/filter_impl/import.go
+++ b/filter/filter_impl/import.go
@@ -24,6 +24,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/filter/active"
_ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
_ "dubbo.apache.org/dubbo-go/v3/filter/auth"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/context"
_ "dubbo.apache.org/dubbo-go/v3/filter/echo"
_ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit"
_ "dubbo.apache.org/dubbo-go/v3/filter/generic"
diff --git a/imports/imports.go b/imports/imports.go
index 82f67d95c..dc9bf2cd7 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -47,6 +47,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/filter/active"
_ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
_ "dubbo.apache.org/dubbo-go/v3/filter/auth"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/context"
_ "dubbo.apache.org/dubbo-go/v3/filter/echo"
_ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit"
_ "dubbo.apache.org/dubbo-go/v3/filter/generic"
diff --git a/protocol/triple/client.go b/protocol/triple/client.go
index fc21c8c13..05cc1d9f9 100644
--- a/protocol/triple/client.go
+++ b/protocol/triple/client.go
@@ -70,6 +70,20 @@ func (cm *clientManager) callUnary(ctx context.Context,
method string, req, resp
if err := triClient.CallUnary(ctx, triReq, triResp); err != nil {
return err
}
+
+ serverAttachments, ok :=
ctx.Value(constant.AttachmentServerKey).(map[string]interface{})
+ if !ok {
+ return nil
+ }
+ for k, v := range triResp.Trailer() {
+ if ok := isFilterHeader(k); ok {
+ continue
+ }
+ if len(v) > 0 {
+ serverAttachments[k] = v[0]
+ }
+ }
+
return nil
}
@@ -219,3 +233,15 @@ func newClientManager(url *common.URL) (*clientManager,
error) {
triClients: triClients,
}, nil
}
+
+func isFilterHeader(key string) bool {
+ if key != "" && key[0] == ':' {
+ return true
+ }
+ switch key {
+ case constant.GrpcHeaderMessage, constant.GrpcHeaderStatus:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index 92e2ce98e..6c19585bd 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -247,15 +247,31 @@ func (s *Server) handleServiceWithInfo(interfaceName
string, invoker protocol.In
attachments :=
generateAttachments(req.Header())
// inject attachments
ctx = context.WithValue(ctx,
constant.AttachmentKey, attachments)
+ capturedAttachments :=
make(map[string]any)
+ ctx = context.WithValue(ctx,
constant.AttachmentServerKey, capturedAttachments)
invo :=
invocation.NewRPCInvocation(m.Name, args, attachments)
res := invoker.Invoke(ctx, invo)
// todo(DMwangnima): modify InfoInvoker
to get a unified processing logic
+ var triResp *tri.Response
// please refer to
server/InfoInvoker.Invoke()
- if triResp, ok :=
res.Result().(*tri.Response); ok {
- return triResp, res.Error()
+ if existingResp, ok :=
res.Result().(*tri.Response); ok {
+ triResp = existingResp
+ } else {
+ // please refer to
proxy/proxy_factory/ProxyInvoker.Invoke
+ triResp =
tri.NewResponse([]any{res.Result()})
+ }
+ for k, v := range res.Attachments() {
+ switch val := v.(type) {
+ case string:
+
triResp.Trailer().Set(k, val)
+ case []string:
+ if len(val) > 0 {
+
triResp.Trailer().Set(k, val[0])
+ }
+ default:
+ triResp.Header().Set(k,
fmt.Sprintf("%v", val))
+ }
}
- // please refer to
proxy/proxy_factory/ProxyInvoker.Invoke
- triResp :=
tri.NewResponse([]any{res.Result()})
return triResp, res.Error()
},
opts...,