This is an automated email from the ASF dual-hosted git repository. alexstocks pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push: new 07293072d fix: Fixing the bug where the client fails to receive attachments from the server response (#2928) 07293072d is described below commit 07293072d3d403d55faa022510baa8d53f1be8ea Author: ziyyun <weichan...@ziyyun.com> AuthorDate: Wed Jul 9 18:04:00 2025 +0800 fix: Fixing the bug where the client fails to receive attachments from the server response (#2928) * Revert "fix: client can not get attach attachments from server (#2854)" This reverts commit 3040a9227bc08b9fb1201ff71a6adf6fdf6b72fe. * fix: 解决客户端无法获取服务端返回的attachments * fix: 解决一个格式的问题 * fix: 解决空指针崩溃问题 * fix: 解决response != nil,但指向对象为nil,导致空指针的问题。 --------- Co-authored-by: marsevilspirit <marsevilspi...@gmail.com> --- common/constant/default.go | 2 +- common/constant/key.go | 7 -- config/service_config_test.go | 2 +- filter/context/filter.go | 138 ----------------------- filter/filter_impl/import.go | 1 - imports/imports.go | 1 - protocol/triple/client.go | 26 ----- protocol/triple/server.go | 12 +- protocol/triple/triple_invoker.go | 9 ++ protocol/triple/triple_protocol/handler.go | 20 ++-- protocol/triple/triple_protocol/protocol_grpc.go | 6 +- 11 files changed, 31 insertions(+), 193 deletions(-) diff --git a/common/constant/default.go b/common/constant/default.go index 83480c006..0b2f2d5dd 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -65,7 +65,7 @@ const ( // that put the AdaptiveServiceProviderFilterKey at the end. DefaultServiceFilters = EchoFilterKey + "," + TokenFilterKey + "," + AccessLogFilterKey + "," + TpsLimitFilterKey + "," + - GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey + "," + ContextFilterKey + GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey DefaultReferenceFilters = GracefulShutdownConsumerFilterKey ) diff --git a/common/constant/key.go b/common/constant/key.go index 9c15645de..43ace78c7 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -120,7 +120,6 @@ const ( XdsCircuitBreakerKey = "xds_circuit_reaker" OTELServerTraceKey = "otelServerTrace" OTELClientTraceKey = "otelClientTrace" - ContextFilterKey = "context" ) const ( @@ -342,7 +341,6 @@ const ( Tagkey = "dubbo.tag" // key of tag ConditionKey = "dubbo.condition" AttachmentKey = DubboCtxKey("attachment") // key in context in invoker - AttachmentServerKey = DubboCtxKey("server-attachment") TagRouterFactoryKey = "tag" AffinityAppRouterFactoryKey = "application.affinity" AffinityServiceRouterFactoryKey = "service.affinity" @@ -497,8 +495,3 @@ const ( MinNacosWeight = 0.0 // Minimum allowed weight (Nacos range starts at 0) MaxNacosWeight = 10000.0 // Maximum allowed weight (Nacos range ends at 10000) ) - -const ( - GrpcHeaderStatus = "Grpc-Status" - GrpcHeaderMessage = "Grpc-Message" -) diff --git a/config/service_config_test.go b/config/service_config_test.go index ec7f4af48..3136b8fc9 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -114,7 +114,7 @@ func TestNewServiceConfigBuilder(t *testing.T) { values := serviceConfig.getUrlMap() assert.Equal(t, values.Get("methods.Say.weight"), "0") assert.Equal(t, values.Get("methods.Say.tps.limit.rate"), "") - assert.Equal(t, values.Get(constant.ServiceFilterKey), "echo,token,accesslog,tps,generic_service,execute,pshutdown,context") + assert.Equal(t, values.Get(constant.ServiceFilterKey), "echo,token,accesslog,tps,generic_service,execute,pshutdown") }) t.Run("Implement", func(t *testing.T) { diff --git a/filter/context/filter.go b/filter/context/filter.go deleted file mode 100644 index 36b80f275..000000000 --- a/filter/context/filter.go +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package context - -import ( - "context" - "strings" - "sync" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/filter" - "dubbo.apache.org/dubbo-go/v3/protocol/base" - "dubbo.apache.org/dubbo-go/v3/protocol/result" -) - -var ( - once sync.Once - ctx *contextFilter -) - -var unloadingKeys = make(map[string]struct{}) - -func init() { - unloadingKeys[constant.PathKey] = struct{}{} - unloadingKeys[constant.InterfaceKey] = struct{}{} - unloadingKeys[constant.GroupKey] = struct{}{} - unloadingKeys[constant.VersionKey] = struct{}{} - unloadingKeys[constant.TokenKey] = struct{}{} - unloadingKeys[constant.TimeoutKey] = struct{}{} - - unloadingKeys[constant.AsyncKey] = struct{}{} - unloadingKeys[constant.TagKey] = struct{}{} - unloadingKeys[constant.ForceUseTag] = struct{}{} - - httpHeaders := []string{ - "accept", - "accept-charset", - "accept-datetime", - "accept-encoding", - "accept-language", - "access-control-request-headers", - "access-control-request-method", - "authorization", - "cache-control", - "connection", - "content-length", - "content-md5", - "content-type", - "cookie", - "date", - "dnt", - "expect", - "forwarded", - "from", - "host", - "http2-settings", - "if-match", - "if-modified-since", - "if-none-match", - "if-range", - "if-unmodified-since", - "max-forwards", - "origin", - "pragma", - "proxy-authorization", - "range", - "referer", - "sec-fetch-dest", - "sec-fetch-mode", - "sec-fetch-site", - "sec-fetch-user", - "te", - "trailer", - "upgrade", - "upgrade-insecure-requests", - "user-agent", - } - for _, header := range httpHeaders { - unloadingKeys[header] = struct{}{} - } -} - -func init() { - extension.SetFilter(constant.ContextFilterKey, newContextFilter) -} - -type contextFilter struct{} - -func newContextFilter() filter.Filter { - if ctx == nil { - once.Do(func() { - ctx = &contextFilter{} - }) - } - return ctx -} - -// Invoke do nothing -func (f *contextFilter) Invoke(ctx context.Context, invoker base.Invoker, invocation base.Invocation) result.Result { - return invoker.Invoke(ctx, invocation) -} - -// OnResponse pass attachments to result -func (f *contextFilter) OnResponse(ctx context.Context, result result.Result, _ base.Invoker, - _ base.Invocation) result.Result { - attachmentsRaw := ctx.Value(constant.AttachmentServerKey) - if attachmentsRaw != nil { - if attachments, ok := attachmentsRaw.(map[string]any); ok { - filtered := make(map[string]any) - for key, value := range attachments { - if _, exists := unloadingKeys[strings.ToLower(key)]; !exists { - filtered[key] = value - } - } - result.SetAttachments(filtered) - } - } - - return result -} diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go index 277b5f56e..2e389b90a 100644 --- a/filter/filter_impl/import.go +++ b/filter/filter_impl/import.go @@ -24,7 +24,6 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/filter/active" _ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc" _ "dubbo.apache.org/dubbo-go/v3/filter/auth" - _ "dubbo.apache.org/dubbo-go/v3/filter/context" _ "dubbo.apache.org/dubbo-go/v3/filter/echo" _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit" _ "dubbo.apache.org/dubbo-go/v3/filter/generic" diff --git a/imports/imports.go b/imports/imports.go index f06d8b55c..4044f70dc 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -47,7 +47,6 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/filter/active" _ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc" _ "dubbo.apache.org/dubbo-go/v3/filter/auth" - _ "dubbo.apache.org/dubbo-go/v3/filter/context" _ "dubbo.apache.org/dubbo-go/v3/filter/echo" _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit" _ "dubbo.apache.org/dubbo-go/v3/filter/generic" diff --git a/protocol/triple/client.go b/protocol/triple/client.go index 9300bee68..f52b169db 100644 --- a/protocol/triple/client.go +++ b/protocol/triple/client.go @@ -80,20 +80,6 @@ func (cm *clientManager) callUnary(ctx context.Context, method string, req, resp if err := triClient.CallUnary(ctx, triReq, triResp); err != nil { return err } - - serverAttachments, ok := ctx.Value(constant.AttachmentServerKey).(map[string]any) - if !ok { - return nil - } - for k, v := range triResp.Trailer() { - if ok := isFilterHeader(k); ok { - continue - } - if len(v) > 0 { - serverAttachments[k] = v[0] - } - } - return nil } @@ -328,15 +314,3 @@ func genKeepAliveOpts(url *common.URL) ([]tri.ClientOption, time.Duration, time. return cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, nil } - -func isFilterHeader(key string) bool { - if key != "" && key[0] == ':' { - return true - } - switch key { - case constant.GrpcHeaderMessage, constant.GrpcHeaderStatus: - return true - default: - return false - } -} diff --git a/protocol/triple/server.go b/protocol/triple/server.go index 6f28c84cf..ad87b75a4 100644 --- a/protocol/triple/server.go +++ b/protocol/triple/server.go @@ -317,13 +317,11 @@ func (s *Server) handleServiceWithInfo(interfaceName string, invoker base.Invoke attachments := generateAttachments(req.Header()) // inject attachments ctx = context.WithValue(ctx, constant.AttachmentKey, attachments) - capturedAttachments := make(map[string]any) - ctx = context.WithValue(ctx, constant.AttachmentServerKey, capturedAttachments) invo := invocation.NewRPCInvocation(m.Name, args, attachments) res := invoker.Invoke(ctx, invo) // todo(DMwangnima): modify InfoInvoker to get a unified processing logic - var triResp *tri.Response // please refer to server/InfoInvoker.Invoke() + var triResp *tri.Response if existingResp, ok := res.Result().(*tri.Response); ok { triResp = existingResp } else { @@ -333,13 +331,11 @@ func (s *Server) handleServiceWithInfo(interfaceName string, invoker base.Invoke for k, v := range res.Attachments() { switch val := v.(type) { case string: - triResp.Trailer().Set(k, val) + tri.AppendToOutgoingContext(ctx, k, val) case []string: - if len(val) > 0 { - triResp.Trailer().Set(k, val[0]) + for _, v := range val { + tri.AppendToOutgoingContext(ctx, k, v) } - default: - triResp.Header().Set(k, fmt.Sprintf("%v", val)) } } return triResp, res.Error() diff --git a/protocol/triple/triple_invoker.go b/protocol/triple/triple_invoker.go index 4f222a0cc..11d81ae3a 100644 --- a/protocol/triple/triple_invoker.go +++ b/protocol/triple/triple_invoker.go @@ -140,6 +140,15 @@ func (ti *TripleInvoker) Invoke(ctx context.Context, invocation base.Invocation) panic(fmt.Sprintf("Unsupported CallType: %s", callType)) } + if header, ok := tri.FromIncomingContext(ctx); ok { + for v, k := range header { + if tri.IsReservedHeader(v) { + continue + } + result.AddAttachment(v, k) + } + } + return &result } diff --git a/protocol/triple/triple_protocol/handler.go b/protocol/triple/triple_protocol/handler.go index a66c5fac9..773c887d0 100644 --- a/protocol/triple/triple_protocol/handler.go +++ b/protocol/triple/triple_protocol/handler.go @@ -106,16 +106,19 @@ func generateUnaryHandlerFunc( ctx = newIncomingContext(ctx, conn.RequestHeader()) response, err := untyped(ctx, request) + + //Write the server-side return-attachment-data in the tailer to send to the caller + if data := ExtractFromOutgoingContext(ctx); data != nil { + mergeHeaders(conn.ResponseTrailer(), data) + } + if err != nil { return err } + // merge headers mergeHeaders(conn.ResponseHeader(), response.Header()) mergeHeaders(conn.ResponseTrailer(), response.Trailer()) - //Write the server-side return-attachment-data in the tailer to send to the caller - if data := ExtractFromOutgoingContext(ctx); data != nil { - mergeHeaders(conn.ResponseTrailer(), data) - } return conn.Send(response.Any()) } @@ -154,6 +157,11 @@ func generateClientStreamHandlerFunc( // embed header in context so that user logic could process them via FromIncomingContext ctx = newIncomingContext(ctx, conn.RequestHeader()) res, err := streamFunc(ctx, stream) + + if outgoingData := ExtractFromOutgoingContext(ctx); outgoingData != nil { + mergeHeaders(conn.ResponseTrailer(), outgoingData) + } + if err != nil { return err } @@ -162,11 +170,9 @@ func generateClientStreamHandlerFunc( // if we panic here instead, so we can include the procedure name. panic(fmt.Sprintf("%s returned nil *triple.Response and nil error", procedure)) //nolint: forbidigo } + mergeHeaders(conn.ResponseHeader(), res.header) mergeHeaders(conn.ResponseTrailer(), res.trailer) - if outgoingData := ExtractFromOutgoingContext(ctx); outgoingData != nil { - mergeHeaders(conn.ResponseTrailer(), outgoingData) - } return conn.Send(res.Msg) } if interceptor != nil { diff --git a/protocol/triple/triple_protocol/protocol_grpc.go b/protocol/triple/triple_protocol/protocol_grpc.go index 13dc1fff3..c49e5885b 100644 --- a/protocol/triple/triple_protocol/protocol_grpc.go +++ b/protocol/triple/triple_protocol/protocol_grpc.go @@ -477,7 +477,7 @@ func (hc *grpcHandlerConn) ExportableHeader() http.Header { hdr := hc.request.Header for key, vals := range hdr { key = strings.ToLower(key) - if isReservedHeader(key) && !isWhitelistedHeader(key) { + if IsReservedHeader(key) && !IsWhitelistedHeader(key) { continue } cloneVals := make([]string, len(vals)) @@ -491,7 +491,7 @@ func (hc *grpcHandlerConn) ExportableHeader() http.Header { // isReservedHeader checks whether hdr belongs to HTTP2 headers // reserved by gRPC protocol. Any other headers are classified as the // user-specified metadata. -func isReservedHeader(hdr string) bool { +func IsReservedHeader(hdr string) bool { if hdr != "" && hdr[0] == ':' { return true } @@ -516,7 +516,7 @@ func isReservedHeader(hdr string) bool { // isWhitelistedHeader checks whether hdr should be propagated into metadata // visible to users, even though it is classified as "reserved", above. -func isWhitelistedHeader(hdr string) bool { +func IsWhitelistedHeader(hdr string) bool { switch hdr { case ":authority", "user-agent": return true