This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3040a9227 fix: client can not get attach attachments from server 
(#2854)
3040a9227 is described below

commit 3040a9227bc08b9fb1201ff71a6adf6fdf6b72fe
Author: 1kasa <[email protected]>
AuthorDate: Mon Apr 28 20:13:58 2025 +0800

    fix: client can not get attach attachments from server (#2854)
    
    * fix: client can not get attach attachments from server
    
    * fix: ci error(test lack)
    
    * fix: ci error(nil)
---
 common/constant/default.go    |   2 +-
 common/constant/key.go        |   7 +++
 config/service_config_test.go |   2 +-
 filter/context/filter.go      | 137 ++++++++++++++++++++++++++++++++++++++++++
 filter/filter_impl/import.go  |   1 +
 imports/imports.go            |   1 +
 protocol/triple/client.go     |  26 ++++++++
 protocol/triple/server.go     |  24 ++++++--
 8 files changed, 194 insertions(+), 6 deletions(-)

diff --git a/common/constant/default.go b/common/constant/default.go
index edd08301a..2b8769d84 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -62,7 +62,7 @@ const (
        // that put the AdaptiveServiceProviderFilterKey at the end.
        DefaultServiceFilters = EchoFilterKey + "," +
                TokenFilterKey + "," + AccessLogFilterKey + "," + 
TpsLimitFilterKey + "," +
-               GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + 
GracefulShutdownProviderFilterKey
+               GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + 
GracefulShutdownProviderFilterKey + "," + ContextFilterKey
 
        DefaultReferenceFilters = GracefulShutdownConsumerFilterKey
 )
diff --git a/common/constant/key.go b/common/constant/key.go
index fca26aef0..8a357d426 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -105,6 +105,7 @@ const (
        XdsCircuitBreakerKey                 = "xds_circuit_reaker"
        OTELServerTraceKey                   = "otelServerTrace"
        OTELClientTraceKey                   = "otelClientTrace"
+       ContextFilterKey                     = "context"
 )
 
 const (
@@ -321,6 +322,7 @@ const (
        Tagkey                            = "dubbo.tag" // key of tag
        ConditionKey                      = "dubbo.condition"
        AttachmentKey                     = DubboCtxKey("attachment") // key in 
context in invoker
+       AttachmentServerKey               = DubboCtxKey("server-attachment")
        TagRouterFactoryKey               = "tag"
        AffinityAppRouterFactoryKey       = "application.affinity"
        AffinityServiceRouterFactoryKey   = "service.affinity"
@@ -475,3 +477,8 @@ const (
        MinNacosWeight     = 0.0     // Minimum allowed weight (Nacos range 
starts at 0)
        MaxNacosWeight     = 10000.0 // Maximum allowed weight (Nacos range 
ends at 10000)
 )
+
+const (
+       GrpcHeaderStatus  = "Grpc-Status"
+       GrpcHeaderMessage = "Grpc-Message"
+)
diff --git a/config/service_config_test.go b/config/service_config_test.go
index 3136b8fc9..ec7f4af48 100644
--- a/config/service_config_test.go
+++ b/config/service_config_test.go
@@ -114,7 +114,7 @@ func TestNewServiceConfigBuilder(t *testing.T) {
                values := serviceConfig.getUrlMap()
                assert.Equal(t, values.Get("methods.Say.weight"), "0")
                assert.Equal(t, values.Get("methods.Say.tps.limit.rate"), "")
-               assert.Equal(t, values.Get(constant.ServiceFilterKey), 
"echo,token,accesslog,tps,generic_service,execute,pshutdown")
+               assert.Equal(t, values.Get(constant.ServiceFilterKey), 
"echo,token,accesslog,tps,generic_service,execute,pshutdown,context")
        })
 
        t.Run("Implement", func(t *testing.T) {
diff --git a/filter/context/filter.go b/filter/context/filter.go
new file mode 100644
index 000000000..d3eaf02d7
--- /dev/null
+++ b/filter/context/filter.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package context
+
+import (
+       "context"
+       "strings"
+       "sync"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/filter"
+       "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+var (
+       once sync.Once
+       ctx  *contextFilter
+)
+
+var unloadingKeys = make(map[string]struct{})
+
+func init() {
+       unloadingKeys[constant.PathKey] = struct{}{}
+       unloadingKeys[constant.InterfaceKey] = struct{}{}
+       unloadingKeys[constant.GroupKey] = struct{}{}
+       unloadingKeys[constant.VersionKey] = struct{}{}
+       unloadingKeys[constant.TokenKey] = struct{}{}
+       unloadingKeys[constant.TimeoutKey] = struct{}{}
+
+       unloadingKeys[constant.AsyncKey] = struct{}{}
+       unloadingKeys[constant.TagKey] = struct{}{}
+       unloadingKeys[constant.ForceUseTag] = struct{}{}
+
+       httpHeaders := []string{
+               "accept",
+               "accept-charset",
+               "accept-datetime",
+               "accept-encoding",
+               "accept-language",
+               "access-control-request-headers",
+               "access-control-request-method",
+               "authorization",
+               "cache-control",
+               "connection",
+               "content-length",
+               "content-md5",
+               "content-type",
+               "cookie",
+               "date",
+               "dnt",
+               "expect",
+               "forwarded",
+               "from",
+               "host",
+               "http2-settings",
+               "if-match",
+               "if-modified-since",
+               "if-none-match",
+               "if-range",
+               "if-unmodified-since",
+               "max-forwards",
+               "origin",
+               "pragma",
+               "proxy-authorization",
+               "range",
+               "referer",
+               "sec-fetch-dest",
+               "sec-fetch-mode",
+               "sec-fetch-site",
+               "sec-fetch-user",
+               "te",
+               "trailer",
+               "upgrade",
+               "upgrade-insecure-requests",
+               "user-agent",
+       }
+       for _, header := range httpHeaders {
+               unloadingKeys[header] = struct{}{}
+       }
+}
+
+func init() {
+       extension.SetFilter(constant.ContextFilterKey, newContextFilter)
+}
+
+type contextFilter struct{}
+
+func newContextFilter() filter.Filter {
+       if ctx == nil {
+               once.Do(func() {
+                       ctx = &contextFilter{}
+               })
+       }
+       return ctx
+}
+
+// Invoke do nothing
+func (f *contextFilter) Invoke(ctx context.Context, invoker protocol.Invoker, 
invocation protocol.Invocation) protocol.Result {
+       return invoker.Invoke(ctx, invocation)
+}
+
+// OnResponse pass attachments to result
+func (f *contextFilter) OnResponse(ctx context.Context, result 
protocol.Result, _ protocol.Invoker,
+       _ protocol.Invocation) protocol.Result {
+       attachmentsRaw := ctx.Value(constant.AttachmentServerKey)
+       if attachmentsRaw != nil {
+               if attachments, ok := attachmentsRaw.(map[string]any); ok {
+                       filtered := make(map[string]any)
+                       for key, value := range attachments {
+                               if _, exists := 
unloadingKeys[strings.ToLower(key)]; !exists {
+                                       filtered[key] = value
+                               }
+                       }
+                       result.SetAttachments(filtered)
+               }
+       }
+
+       return result
+}
diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go
index 2e389b90a..277b5f56e 100644
--- a/filter/filter_impl/import.go
+++ b/filter/filter_impl/import.go
@@ -24,6 +24,7 @@ import (
        _ "dubbo.apache.org/dubbo-go/v3/filter/active"
        _ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
        _ "dubbo.apache.org/dubbo-go/v3/filter/auth"
+       _ "dubbo.apache.org/dubbo-go/v3/filter/context"
        _ "dubbo.apache.org/dubbo-go/v3/filter/echo"
        _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit"
        _ "dubbo.apache.org/dubbo-go/v3/filter/generic"
diff --git a/imports/imports.go b/imports/imports.go
index 82f67d95c..dc9bf2cd7 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -47,6 +47,7 @@ import (
        _ "dubbo.apache.org/dubbo-go/v3/filter/active"
        _ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc"
        _ "dubbo.apache.org/dubbo-go/v3/filter/auth"
+       _ "dubbo.apache.org/dubbo-go/v3/filter/context"
        _ "dubbo.apache.org/dubbo-go/v3/filter/echo"
        _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit"
        _ "dubbo.apache.org/dubbo-go/v3/filter/generic"
diff --git a/protocol/triple/client.go b/protocol/triple/client.go
index fc21c8c13..05cc1d9f9 100644
--- a/protocol/triple/client.go
+++ b/protocol/triple/client.go
@@ -70,6 +70,20 @@ func (cm *clientManager) callUnary(ctx context.Context, 
method string, req, resp
        if err := triClient.CallUnary(ctx, triReq, triResp); err != nil {
                return err
        }
+
+       serverAttachments, ok := 
ctx.Value(constant.AttachmentServerKey).(map[string]interface{})
+       if !ok {
+               return nil
+       }
+       for k, v := range triResp.Trailer() {
+               if ok := isFilterHeader(k); ok {
+                       continue
+               }
+               if len(v) > 0 {
+                       serverAttachments[k] = v[0]
+               }
+       }
+
        return nil
 }
 
@@ -219,3 +233,15 @@ func newClientManager(url *common.URL) (*clientManager, 
error) {
                triClients: triClients,
        }, nil
 }
+
+func isFilterHeader(key string) bool {
+       if key != "" && key[0] == ':' {
+               return true
+       }
+       switch key {
+       case constant.GrpcHeaderMessage, constant.GrpcHeaderStatus:
+               return true
+       default:
+               return false
+       }
+}
diff --git a/protocol/triple/server.go b/protocol/triple/server.go
index 92e2ce98e..6c19585bd 100644
--- a/protocol/triple/server.go
+++ b/protocol/triple/server.go
@@ -247,15 +247,31 @@ func (s *Server) handleServiceWithInfo(interfaceName 
string, invoker protocol.In
                                        attachments := 
generateAttachments(req.Header())
                                        // inject attachments
                                        ctx = context.WithValue(ctx, 
constant.AttachmentKey, attachments)
+                                       capturedAttachments := 
make(map[string]any)
+                                       ctx = context.WithValue(ctx, 
constant.AttachmentServerKey, capturedAttachments)
                                        invo := 
invocation.NewRPCInvocation(m.Name, args, attachments)
                                        res := invoker.Invoke(ctx, invo)
                                        // todo(DMwangnima): modify InfoInvoker 
to get a unified processing logic
+                                       var triResp *tri.Response
                                        // please refer to 
server/InfoInvoker.Invoke()
-                                       if triResp, ok := 
res.Result().(*tri.Response); ok {
-                                               return triResp, res.Error()
+                                       if existingResp, ok := 
res.Result().(*tri.Response); ok {
+                                               triResp = existingResp
+                                       } else {
+                                               // please refer to 
proxy/proxy_factory/ProxyInvoker.Invoke
+                                               triResp = 
tri.NewResponse([]any{res.Result()})
+                                       }
+                                       for k, v := range res.Attachments() {
+                                               switch val := v.(type) {
+                                               case string:
+                                                       
triResp.Trailer().Set(k, val)
+                                               case []string:
+                                                       if len(val) > 0 {
+                                                               
triResp.Trailer().Set(k, val[0])
+                                                       }
+                                               default:
+                                                       triResp.Header().Set(k, 
fmt.Sprintf("%v", val))
+                                               }
                                        }
-                                       // please refer to 
proxy/proxy_factory/ProxyInvoker.Invoke
-                                       triResp := 
tri.NewResponse([]any{res.Result()})
                                        return triResp, res.Error()
                                },
                                opts...,

Reply via email to