This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 96542304 [AI] feat: Compatible with http/sse protocol (#657)
96542304 is described below

commit 9654230426802411b6dc5247e0fce53ad1935eca
Author: Alan <[email protected]>
AuthorDate: Sun Apr 20 13:19:11 2025 +0800

    [AI] feat: Compatible with http/sse protocol (#657)
    
    * impl: Compatible with http/sse protocol
    
    * impl: retry
    
    * delete redundant files
    
    * fix assert
    
    * fix assert
    
    * fix import
    
    * change interface to client.Response
    
    * fix comments
    
    * merge two test files
    
    * bug fix
    
    * rename symbol & fix https router
    
    * nil pointer fix
---
 pkg/client/http/http.go                            |   9 ++
 pkg/client/response.go                             |  36 ++++-
 pkg/common/constant/http.go                        |  15 ++-
 pkg/common/http/manager.go                         | 101 ++++++++++++--
 pkg/common/http/manager_test.go                    | 146 ++++++++++++++++++++-
 .../http/httpproxy => common/mock}/routerfilter.go |  79 ++++-------
 pkg/common/util/response.go                        |   4 +-
 pkg/context/http/context.go                        |   6 +-
 pkg/filter/accesslog/access_log.go                 |   3 +-
 pkg/filter/accesslog/access_log_test.go            |   2 +-
 pkg/filter/http/httpproxy/routerfilter.go          |  32 ++---
 pkg/filter/metric/metric.go                        |  10 +-
 pkg/filter/prometheus/metric_test.go               |   2 +-
 pkg/prometheus/prometheus.go                       |   8 +-
 14 files changed, 346 insertions(+), 107 deletions(-)

diff --git a/pkg/client/http/http.go b/pkg/client/http/http.go
index 875dd33e..8cd0c4aa 100644
--- a/pkg/client/http/http.go
+++ b/pkg/client/http/http.go
@@ -26,6 +26,7 @@ import (
 
 import (
        "github.com/pkg/errors"
+
        "go.opentelemetry.io/otel"
        semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
        "go.opentelemetry.io/otel/trace"
@@ -107,6 +108,7 @@ func (dc *Client) Call(req *client.Request) (resp 
interface{}, err error) {
        newReq.Header = params.Header
        httpClient := &http.Client{Timeout: req.Timeout}
 
+       // Observability
        tr := otel.Tracer(traceNameHTTPClient)
        _, span := tr.Start(req.Context, "HTTP "+newReq.Method, 
trace.WithSpanKind(trace.SpanKindClient))
        trace.SpanFromContext(req.Context).SpanContext()
@@ -116,6 +118,7 @@ func (dc *Client) Call(req *client.Request) (resp 
interface{}, err error) {
        newReq.Header.Set(jaegerTraceIDInHeader, 
span.SpanContext().TraceID().String())
        defer span.End()
 
+       // Real request
        tmpRet, err := httpClient.Do(newReq)
        if tmpRet != nil {
                
span.SetAttributes(semconv.HTTPStatusCodeKey.Int(tmpRet.StatusCode))
@@ -197,3 +200,9 @@ func (dc *Client) parseURL(req *client.Request, params 
requestParams) (string, e
        }
        return parsedURL.String(), nil
 }
+
+// IsSSEStream check if the response is a SSE stream
+func IsSSEStream(resp *http.Response) bool {
+       contentType := resp.Header.Get(constant.HeaderKeyContextType)
+       return strings.Contains(contentType, 
constant.HeaderValueTextEventStream)
+}
diff --git a/pkg/client/response.go b/pkg/client/response.go
index cb461c76..fac5fd95 100644
--- a/pkg/client/response.go
+++ b/pkg/client/response.go
@@ -17,12 +17,38 @@
 
 package client
 
-// Response response from endpoint
-type Response struct {
+import (
+       "io"
+)
+
+type Response interface {
+       IsStream() bool
+}
+
+// UnaryResponse response from endpoint
+type UnaryResponse struct {
        Data []byte
 }
 
-// NewResponse create response
-func NewResponse(data []byte) *Response {
-       return &Response{Data: data}
+func (r *UnaryResponse) IsStream() bool {
+       return false
+}
+
+// NewByteResponse create response contains a []byte
+func NewByteResponse(data []byte) *UnaryResponse {
+       return &UnaryResponse{Data: data}
+}
+
+// StreamResponse response from endpoint
+type StreamResponse struct {
+       Stream io.ReadCloser
+}
+
+func (r *StreamResponse) IsStream() bool {
+       return true
+}
+
+// NewStreamResponse create response contains a stream
+func NewStreamResponse(stream io.ReadCloser) *StreamResponse {
+       return &StreamResponse{Stream: stream}
 }
diff --git a/pkg/common/constant/http.go b/pkg/common/constant/http.go
index d76536cf..ac10c843 100644
--- a/pkg/common/constant/http.go
+++ b/pkg/common/constant/http.go
@@ -18,7 +18,9 @@
 package constant
 
 const (
-       HeaderKeyContextType = "Content-Type"
+       HeaderKeyContextType  = "Content-Type"
+       HeaderKeyCacheControl = "Cache-Control"
+       HeaderKeyConnection   = "Connection"
 
        HeaderKeyAccessControlAllowOrigin      = "Access-Control-Allow-Origin"
        HeaderKeyAccessControlAllowHeaders     = "Access-Control-Allow-Headers"
@@ -29,8 +31,12 @@ const (
 
        HeaderValueJsonUtf8        = "application/json;charset=UTF-8"
        HeaderValueTextPlain       = "text/plain"
+       HeaderValueTextEventStream = "text/event-stream"
        HeaderValueApplicationJson = "application/json"
 
+       HeaderValueKeepAlive = "keep-alive"
+       HeaderValueNoCache   = "no-cache"
+
        HeaderValueAll = "*"
 
        PathSlash           = "/"
@@ -72,3 +78,10 @@ const (
        Origin                     = "Origin"
        XForwardedProto            = "X-Forwarded-Proto"
 )
+
+// SSE response prefixes
+const (
+       SSEData  = "data"
+       SSEEvent = "event"
+       SSEId    = "id"
+)
diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go
index 8b3ee7bd..f2b3d809 100644
--- a/pkg/common/http/manager.go
+++ b/pkg/common/http/manager.go
@@ -32,6 +32,7 @@ import (
 
 import (
        "github.com/apache/dubbo-go-pixiu/pkg/client"
+       "github.com/apache/dubbo-go-pixiu/pkg/client/http"
        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
        router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router"
@@ -107,16 +108,87 @@ func (hcm *HttpConnectionManager) handleHTTPRequest(c 
*pch.HttpContext) {
        //todo timeout
        filterChain.OnDecode(c)
        hcm.buildTargetResponse(c)
+       //todo: stream resp has to set HTTP Server's WriteTimeout to 0, need to 
check it
        filterChain.OnEncode(c)
        hcm.writeResponse(c)
 }
 
 func (hcm *HttpConnectionManager) writeResponse(c *pch.HttpContext) {
        if !c.LocalReply() {
-               writer := c.Writer
-               writer.WriteHeader(c.GetStatusCode())
-               if _, err := writer.Write(c.TargetResp.Data); err != nil {
-                       logger.Errorf("write response error: %s", err)
+               c.Writer.WriteHeader(c.GetStatusCode())
+               if c.TargetResp != nil {
+                       switch res := c.TargetResp.(type) {
+                       case *client.UnaryResponse:
+                               _, err := c.Writer.Write(res.Data)
+                               if err != nil {
+                                       logger.Errorf("Write response failed: 
%v", err)
+                               }
+                       case *client.StreamResponse:
+                               // create ctx helps goroutine exit
+                               ctx, cancel := context.WithCancel(c.Ctx)
+                               defer cancel()
+
+                               dataC := make(chan []byte)
+                               errC := make(chan error, 1)
+
+                               // goroutine read stream
+                               go func() {
+                                       defer close(dataC)
+                                       defer close(errC)
+                                       buf := make([]byte, 1024) // 1KB buffer
+                                       for {
+                                               select {
+                                               case <-ctx.Done():
+                                                       return
+                                               default:
+                                                       n, err := 
res.Stream.Read(buf)
+                                                       if n > 0 {
+                                                               // copy data to 
prevent data cover
+                                                               data := 
make([]byte, n)
+                                                               copy(data, 
buf[:n])
+                                                               select {
+                                                               case dataC <- 
data:
+                                                               case 
<-ctx.Done():
+                                                                       return
+                                                               }
+                                                       }
+                                                       if err != nil {
+                                                               if err != 
io.EOF {
+                                                                       errC <- 
fmt.Errorf("stream read error: %w", err)
+                                                               } else {
+                                                                       errC <- 
io.EOF
+                                                               }
+                                                               return
+                                                       }
+                                               }
+                                       }
+                               }()
+
+                               for {
+                                       select {
+                                       case <-ctx.Done():
+                                               _ = res.Stream.Close()
+                                               return
+                                       case data, ok := <-dataC:
+                                               if !ok {
+                                                       return
+                                               }
+                                               if _, err := 
c.Writer.Write(data); err != nil {
+                                                       cancel()
+                                                       _ = res.Stream.Close()
+                                                       return
+                                               }
+                                       case err := <-errC:
+                                               if err != nil && err != io.EOF {
+                                                       logger.Errorf("Stream 
error: %v", err)
+                                               }
+                                               return
+                                       }
+                               }
+                       default:
+                               logger.Errorf("Unknown response type: %T", 
c.TargetResp)
+                       }
+
                }
        }
 }
@@ -128,12 +200,6 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c 
*pch.HttpContext) {
 
        switch res := c.SourceResp.(type) {
        case *stdHttp.Response:
-               body, err := io.ReadAll(res.Body)
-               if err != nil {
-                       panic(err)
-               }
-               //close body
-               _ = res.Body.Close()
                //Merge header
                remoteHeader := res.Header
                for k := range remoteHeader {
@@ -141,7 +207,18 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c 
*pch.HttpContext) {
                }
                //status code
                c.StatusCode(res.StatusCode)
-               c.TargetResp = &client.Response{Data: body}
+
+               if http.IsSSEStream(res) {
+                       c.TargetResp = &client.StreamResponse{Stream: res.Body}
+               } else {
+                       body, err := io.ReadAll(res.Body)
+                       if err != nil {
+                               panic(err)
+                       }
+                       //close body
+                       _ = res.Body.Close()
+                       c.TargetResp = &client.UnaryResponse{Data: body}
+               }
        case []byte:
                c.StatusCode(stdHttp.StatusOK)
                if json.Valid(res) {
@@ -149,7 +226,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c 
*pch.HttpContext) {
                } else {
                        c.AddHeader(constant.HeaderKeyContextType, 
constant.HeaderValueTextPlain)
                }
-               c.TargetResp = &client.Response{Data: res}
+               c.TargetResp = &client.UnaryResponse{Data: res}
        default:
                //dubbo go generic invoke
                response := util.NewDubboResponse(res, false)
diff --git a/pkg/common/http/manager_test.go b/pkg/common/http/manager_test.go
index 684dd2dd..450546a5 100644
--- a/pkg/common/http/manager_test.go
+++ b/pkg/common/http/manager_test.go
@@ -19,9 +19,14 @@ package http
 
 import (
        "bytes"
+       "context"
        "fmt"
+       "net"
        "net/http"
+       "net/http/httptest"
+       "strings"
        "testing"
+       "time"
 )
 
 import (
@@ -30,6 +35,7 @@ import (
 
 import (
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+       commonmock "github.com/apache/dubbo-go-pixiu/pkg/common/mock"
        "github.com/apache/dubbo-go-pixiu/pkg/common/router/trie"
        contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
        "github.com/apache/dubbo-go-pixiu/pkg/context/mock"
@@ -43,6 +49,10 @@ const (
        Kind = DEMO
 )
 
+var (
+       eventCh = make(chan string, 3)
+)
+
 type (
        // Plugin is http filter plugin.
        Plugin struct {
@@ -89,10 +99,10 @@ func (f *DemoFilter) Encode(ctx *contexthttp.HttpContext) 
filter.FilterStatus {
 func (f *DemoFilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, 
chain filter.FilterChain) error {
        c := f.conf
        str := fmt.Sprintf("%s is drinking in the %s", c.Foo, c.Bar)
-       filter := &DemoFilter{str: str}
+       demoFilter := &DemoFilter{str: str}
 
-       chain.AppendDecodeFilters(filter)
-       chain.AppendEncodeFilters(filter)
+       chain.AppendDecodeFilters(demoFilter)
+       chain.AppendEncodeFilters(demoFilter)
        return nil
 }
 
@@ -131,7 +141,7 @@ func TestCreateHttpConnectionManager(t *testing.T) {
        request, err := http.NewRequest("POST", 
"http://www.dubbogopixiu.com/api/v1?name=tc";, 
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
        assert.NoError(t, err)
        request.Header = map[string][]string{
-               "X-Dgp-Way": []string{"Dubbo"},
+               "X-Dgp-Way": {"Dubbo"},
        }
        assert.NoError(t, err)
        c := mock.GetMockHTTPContext(request)
@@ -140,3 +150,131 @@ func TestCreateHttpConnectionManager(t *testing.T) {
        err = hcm.Handle(c)
        assert.NoError(t, err)
 }
+
+// test SSE case
+func TestStreamingResponse(t *testing.T) {
+       hcmc := model.HttpConnectionManagerConfig{
+               RouteConfig: model.RouteConfiguration{
+                       RouteTrie: trie.NewTrieWithDefault("GET/api/sse", 
model.RouteAction{
+                               Cluster: "mock_stream_cluster",
+                       }),
+               },
+               HTTPFilters: []*model.HTTPFilter{
+                       {
+                               Name: commonmock.Kind,
+                       },
+               },
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+       defer cancel()
+
+       // mock server
+       upstreamServer, _ := NewTestServerWithURL("localhost:8080", 
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               w.Header().Set("Content-Type", "text/event-stream")
+               w.Header().Set("Cache-Control", "no-cache")
+               flusher := w.(http.Flusher)
+
+               for i := 1; i <= 3; i++ {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       default:
+                               time.Sleep(10 * time.Millisecond)
+                               event := fmt.Sprintf("data: %d\nevent: %d\nid: 
%d\n\n", i, i, i)
+                               _, _ = w.Write([]byte(event))
+                               flusher.Flush()
+                               logger.Info("Upstream sent event ", i)
+                       }
+               }
+       }))
+       defer upstreamServer.Close()
+
+       req := httptest.NewRequest("GET", "http://localhost:8080/api/sse";, 
nil).WithContext(ctx)
+
+       done := make(chan struct{})
+
+       httpCtx := &contexthttp.HttpContext{
+               Request: req,
+               Writer:  NewStreamRecorder(),
+               Ctx:     ctx,
+       }
+       go func() {
+               defer close(done)
+
+               hcm := CreateHttpConnectionManager(&hcmc)
+
+               if err := hcm.Handle(httpCtx); err != nil {
+                       t.Errorf("Handle failed: %v", err)
+               }
+
+               // test targetResp
+               if httpCtx.TargetResp == nil {
+                       t.Error("TargetResp is nil")
+                       return
+               }
+       }()
+
+       // event waiting test
+       for {
+               receivedEvents := httpCtx.Writer.(*StreamRecorder).receivedBuf
+               select {
+               case event := <-eventCh:
+                       logger.Info("Received event: %s", 
strings.ReplaceAll(event, "\n", "\\n"))
+               case <-done:
+                       assert.Equal(t, 3, len(receivedEvents), "Should receive 
3 events")
+                       return
+               case <-time.After(5 * time.Second):
+                       t.Fatal("Test timeout")
+                       return
+               }
+       }
+
+}
+
+// mock recorder
+type StreamRecorder struct {
+       http.ResponseWriter
+       http.Flusher
+       receivedBuf []string
+       headers     http.Header
+       status      int
+}
+
+func NewStreamRecorder() *StreamRecorder {
+       return &StreamRecorder{
+               receivedBuf: make([]string, 0),
+               headers:     make(http.Header),
+       }
+}
+
+func (r *StreamRecorder) Header() http.Header {
+       return r.headers
+}
+
+func (r *StreamRecorder) WriteHeader(statusCode int) {
+       r.status = statusCode
+}
+
+func (r *StreamRecorder) Write(data []byte) (int, error) {
+       eventCh <- string(data)
+       r.receivedBuf = append(r.receivedBuf, string(data))
+       return len(data), nil
+}
+
+func NewTestServerWithURL(URL string, handler http.Handler) (*httptest.Server, 
error) {
+       ts := httptest.NewUnstartedServer(handler)
+       if URL != "" {
+               l, err := net.Listen("tcp", URL)
+               if err != nil {
+                       return nil, err
+               }
+               err = ts.Listener.Close()
+               if err != nil {
+                       return nil, err
+               }
+               ts.Listener = l
+       }
+       ts.Start()
+       return ts, nil
+}
diff --git a/pkg/filter/http/httpproxy/routerfilter.go 
b/pkg/common/mock/routerfilter.go
similarity index 54%
copy from pkg/filter/http/httpproxy/routerfilter.go
copy to pkg/common/mock/routerfilter.go
index d47cfdaf..046ccd13 100644
--- a/pkg/filter/http/httpproxy/routerfilter.go
+++ b/pkg/common/mock/routerfilter.go
@@ -15,27 +15,24 @@
  * limitations under the License.
  */
 
-package httpproxy
+package mock
 
 import (
        "encoding/json"
        "fmt"
-       stdhttp "net/http"
-       "net/url"
+       "net/http"
        "time"
 )
 
 import (
-       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
-       "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+       contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
-       "github.com/apache/dubbo-go-pixiu/pkg/server"
 )
 
 const (
        // Kind is the kind of Fallback.
-       Kind = constant.HTTPProxyFilter
+       Kind = "dgp.filter.http.sse.httpproxy"
 )
 
 func init() {
@@ -49,11 +46,11 @@ type (
        // FilterFactory is http filter instance
        FilterFactory struct {
                cfg    *Config
-               client stdhttp.Client
+               client http.Client
        }
        //Filter
        Filter struct {
-               client stdhttp.Client
+               client http.Client
        }
        // Config describe the config of FilterFactory
        Config struct {
@@ -72,83 +69,57 @@ func (p *Plugin) CreateFilterFactory() 
(filter.HttpFilterFactory, error) {
        return &FilterFactory{cfg: &Config{}}, nil
 }
 
-func (factory *FilterFactory) Config() interface{} {
-       return factory.cfg
+func (ff *FilterFactory) Config() interface{} {
+       return ff.cfg
 }
 
-func (factory *FilterFactory) Apply() error {
-       cfg := factory.cfg
-       client := stdhttp.Client{
+func (ff *FilterFactory) Apply() error {
+       cfg := ff.cfg
+       client := http.Client{
                Timeout: cfg.Timeout,
-               Transport: stdhttp.RoundTripper(&stdhttp.Transport{
+               Transport: http.RoundTripper(&http.Transport{
                        MaxIdleConns:        cfg.MaxIdleConns,
                        MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
                        MaxConnsPerHost:     cfg.MaxConnsPerHost,
                }),
        }
-       factory.client = client
+       ff.client = client
        return nil
 }
 
-func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain 
filter.FilterChain) error {
+func (ff *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, 
chain filter.FilterChain) error {
        //reuse http client
-       f := &Filter{factory.client}
+       f := &Filter{ff.client}
        chain.AppendDecodeFilters(f)
        return nil
 }
 
-func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
-       rEntry := hc.GetRouteEntry()
-       if rEntry == nil {
-               panic("no route entry")
-       }
-       logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster 
:%v", rEntry.Cluster)
-
-       clusterName := rEntry.Cluster
-       clusterManager := server.GetClusterManager()
-       endpoint := clusterManager.PickEndpoint(clusterName, hc)
-       if endpoint == nil {
-               logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
-               bt, _ := json.Marshal(http.ErrResponse{Message: "cluster not 
found endpoint"})
-               hc.SendLocalReply(stdhttp.StatusServiceUnavailable, bt)
-               return filter.Stop
-       }
-
-       logger.Debugf("[dubbo-go-pixiu] client choose endpoint :%v", 
endpoint.Address.GetAddress())
+func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus {
        r := hc.Request
 
        var (
-               req *stdhttp.Request
+               req *http.Request
                err error
        )
 
-       parsedURL := url.URL{
-               Host:     endpoint.Address.GetAddress(),
-               Scheme:   "http",
-               Path:     r.URL.Path,
-               RawQuery: r.URL.RawQuery,
-       }
-
-       req, err = stdhttp.NewRequest(r.Method, parsedURL.String(), r.Body)
+       req, err = http.NewRequest(r.Method, r.URL.String(), r.Body)
        if err != nil {
-               bt, _ := json.Marshal(http.ErrResponse{Message: 
fmt.Sprintf("BUG: new request failed: %v", err)})
-               hc.SendLocalReply(stdhttp.StatusInternalServerError, bt)
+               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: 
fmt.Sprintf("BUG: new request failed: %v", err)})
+               hc.SendLocalReply(http.StatusInternalServerError, bt)
                return filter.Stop
        }
        req.Header = r.Header
 
        resp, err := f.client.Do(req)
+
        if err != nil {
-               urlErr, ok := err.(*url.Error)
-               if ok && urlErr.Timeout() {
-                       hc.SendLocalReply(stdhttp.StatusGatewayTimeout, 
[]byte(err.Error()))
-                       return filter.Stop
-               }
-               hc.SendLocalReply(stdhttp.StatusServiceUnavailable, 
[]byte(err.Error()))
+               hc.SendLocalReply(http.StatusServiceUnavailable, 
[]byte(err.Error()))
                return filter.Stop
        }
-       logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
+
        hc.SourceResp = resp
+
+       logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
        // response write in hcm
        return filter.Continue
 }
diff --git a/pkg/common/util/response.go b/pkg/common/util/response.go
index e8d406d0..5d4403a3 100644
--- a/pkg/common/util/response.go
+++ b/pkg/common/util/response.go
@@ -28,10 +28,10 @@ import (
        "github.com/apache/dubbo-go-pixiu/pkg/client"
 )
 
-func NewDubboResponse(data interface{}, hump bool) *client.Response {
+func NewDubboResponse(data interface{}, hump bool) *client.UnaryResponse {
        r, _ := dealResp(data, hump)
        bytes, _ := json.Marshal(r)
-       return &client.Response{Data: bytes}
+       return &client.UnaryResponse{Data: bytes}
 }
 
 func dealResp(in interface{}, HumpToLine bool) (interface{}, error) {
diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go
index 56c8b246..ec6d1378 100644
--- a/pkg/context/http/context.go
+++ b/pkg/context/http/context.go
@@ -59,7 +59,7 @@ type HttpContext struct {
        // localReplyBody: happen error
        localReplyBody []byte
        // the response context will return.
-       TargetResp *client.Response
+       TargetResp client.Response
        // client call response.
        SourceResp interface{}
 
@@ -171,13 +171,13 @@ func (hc *HttpContext) GetApplicationName() string {
        return ""
 }
 
-// SendLocalReply Means that the request was interrupted and Response will be 
sent directly
+// SendLocalReply Means that the request was interrupted and UnaryResponse 
will be sent directly
 // Even if it’s currently in to Decode stage
 func (hc *HttpContext) SendLocalReply(status int, body []byte) {
        hc.localReply = true
        hc.statusCode = status
        hc.localReplyBody = body
-       hc.TargetResp = &client.Response{Data: body}
+       hc.TargetResp = &client.UnaryResponse{Data: body}
        if json.Valid(body) {
                hc.AddHeader(constant.HeaderKeyContextType, 
constant.HeaderValueApplicationJson)
        } else {
diff --git a/pkg/filter/accesslog/access_log.go 
b/pkg/filter/accesslog/access_log.go
index 5cc1bedb..938dca2a 100644
--- a/pkg/filter/accesslog/access_log.go
+++ b/pkg/filter/accesslog/access_log.go
@@ -27,6 +27,7 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pkg/client"
        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
        "github.com/apache/dubbo-go-pixiu/pkg/context/http"
@@ -137,7 +138,7 @@ func buildAccessLogMsg(c *http.HttpContext, cost 
time.Duration) string {
                builder.WriteString(fmt.Sprintf("invoke err [ %v", err))
                builder.WriteString("] ")
        }
-       resp := c.TargetResp.Data
+       resp := c.TargetResp.(*client.UnaryResponse).Data
        if err != nil {
                builder.WriteString(" response can not convert to string")
                builder.WriteString("] ")
diff --git a/pkg/filter/accesslog/access_log_test.go 
b/pkg/filter/accesslog/access_log_test.go
index 6d95b2c2..1fa4164e 100644
--- a/pkg/filter/accesslog/access_log_test.go
+++ b/pkg/filter/accesslog/access_log_test.go
@@ -64,7 +64,7 @@ func TestApply(t *testing.T) {
 
        request, _ := http.NewRequest("POST", 
"http://www.dubbogopixiu.com/mock/test?name=tc";, 
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
        ctx := mock.GetMockHTTPContext(request)
-       ctx.TargetResp = client.NewResponse([]byte(msg))
+       ctx.TargetResp = client.NewByteResponse([]byte(msg))
 
        f.Decode(ctx)
        f.Encode(ctx)
diff --git a/pkg/filter/http/httpproxy/routerfilter.go 
b/pkg/filter/http/httpproxy/routerfilter.go
index d47cfdaf..bc49ae96 100644
--- a/pkg/filter/http/httpproxy/routerfilter.go
+++ b/pkg/filter/http/httpproxy/routerfilter.go
@@ -20,7 +20,7 @@ package httpproxy
 import (
        "encoding/json"
        "fmt"
-       stdhttp "net/http"
+       "net/http"
        "net/url"
        "time"
 )
@@ -28,7 +28,7 @@ import (
 import (
        "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
-       "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+       contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/server"
 )
@@ -49,11 +49,11 @@ type (
        // FilterFactory is http filter instance
        FilterFactory struct {
                cfg    *Config
-               client stdhttp.Client
+               client http.Client
        }
        //Filter
        Filter struct {
-               client stdhttp.Client
+               client http.Client
        }
        // Config describe the config of FilterFactory
        Config struct {
@@ -78,9 +78,9 @@ func (factory *FilterFactory) Config() interface{} {
 
 func (factory *FilterFactory) Apply() error {
        cfg := factory.cfg
-       client := stdhttp.Client{
+       client := http.Client{
                Timeout: cfg.Timeout,
-               Transport: stdhttp.RoundTripper(&stdhttp.Transport{
+               Transport: http.RoundTripper(&http.Transport{
                        MaxIdleConns:        cfg.MaxIdleConns,
                        MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
                        MaxConnsPerHost:     cfg.MaxConnsPerHost,
@@ -90,14 +90,14 @@ func (factory *FilterFactory) Apply() error {
        return nil
 }
 
-func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain 
filter.FilterChain) error {
+func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, 
chain filter.FilterChain) error {
        //reuse http client
        f := &Filter{factory.client}
        chain.AppendDecodeFilters(f)
        return nil
 }
 
-func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
+func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus {
        rEntry := hc.GetRouteEntry()
        if rEntry == nil {
                panic("no route entry")
@@ -109,8 +109,8 @@ func (f *Filter) Decode(hc *http.HttpContext) 
filter.FilterStatus {
        endpoint := clusterManager.PickEndpoint(clusterName, hc)
        if endpoint == nil {
                logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
-               bt, _ := json.Marshal(http.ErrResponse{Message: "cluster not 
found endpoint"})
-               hc.SendLocalReply(stdhttp.StatusServiceUnavailable, bt)
+               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "cluster 
not found endpoint"})
+               hc.SendLocalReply(http.StatusServiceUnavailable, bt)
                return filter.Stop
        }
 
@@ -118,7 +118,7 @@ func (f *Filter) Decode(hc *http.HttpContext) 
filter.FilterStatus {
        r := hc.Request
 
        var (
-               req *stdhttp.Request
+               req *http.Request
                err error
        )
 
@@ -129,10 +129,10 @@ func (f *Filter) Decode(hc *http.HttpContext) 
filter.FilterStatus {
                RawQuery: r.URL.RawQuery,
        }
 
-       req, err = stdhttp.NewRequest(r.Method, parsedURL.String(), r.Body)
+       req, err = http.NewRequest(r.Method, parsedURL.String(), r.Body)
        if err != nil {
-               bt, _ := json.Marshal(http.ErrResponse{Message: 
fmt.Sprintf("BUG: new request failed: %v", err)})
-               hc.SendLocalReply(stdhttp.StatusInternalServerError, bt)
+               bt, _ := json.Marshal(contexthttp.ErrResponse{Message: 
fmt.Sprintf("BUG: new request failed: %v", err)})
+               hc.SendLocalReply(http.StatusInternalServerError, bt)
                return filter.Stop
        }
        req.Header = r.Header
@@ -141,10 +141,10 @@ func (f *Filter) Decode(hc *http.HttpContext) 
filter.FilterStatus {
        if err != nil {
                urlErr, ok := err.(*url.Error)
                if ok && urlErr.Timeout() {
-                       hc.SendLocalReply(stdhttp.StatusGatewayTimeout, 
[]byte(err.Error()))
+                       hc.SendLocalReply(http.StatusGatewayTimeout, 
[]byte(err.Error()))
                        return filter.Stop
                }
-               hc.SendLocalReply(stdhttp.StatusServiceUnavailable, 
[]byte(err.Error()))
+               hc.SendLocalReply(http.StatusServiceUnavailable, 
[]byte(err.Error()))
                return filter.Stop
        }
        logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
diff --git a/pkg/filter/metric/metric.go b/pkg/filter/metric/metric.go
index 044c4f06..fc4bba0a 100644
--- a/pkg/filter/metric/metric.go
+++ b/pkg/filter/metric/metric.go
@@ -25,7 +25,9 @@ import (
 
 import (
        "github.com/pkg/errors"
+
        "go.opentelemetry.io/otel/attribute"
+
        "go.opentelemetry.io/otel/metric/global"
        "go.opentelemetry.io/otel/metric/instrument"
        "go.opentelemetry.io/otel/metric/instrument/syncint64"
@@ -58,7 +60,7 @@ func init() {
 }
 
 type (
-       // FilterFactory is http filter plugin.
+       // Plugin is http filter plugin.
        Plugin struct {
        }
        // FilterFactory is http filter instance
@@ -137,11 +139,11 @@ func (f *Filter) Encode(c *http.HttpContext) 
filter.FilterStatus {
        return filter.Continue
 }
 
-func computeApproximateResponseSize(res *client.Response) (int, error) {
+func computeApproximateResponseSize(res client.Response) (int, error) {
        if res == nil {
-               return 0, errors.New("client.Response is null pointer ")
+               return 0, errors.New("client.UnaryResponse is null pointer ")
        }
-       return len(res.Data), nil
+       return len(res.(*client.UnaryResponse).Data), nil
 }
 
 func computeApproximateRequestSize(r *stdhttp.Request) (int, error) {
diff --git a/pkg/filter/prometheus/metric_test.go 
b/pkg/filter/prometheus/metric_test.go
index 7f39c547..ab7eb65f 100644
--- a/pkg/filter/prometheus/metric_test.go
+++ b/pkg/filter/prometheus/metric_test.go
@@ -62,7 +62,7 @@ func TestCounterExporterApiMetric(t *testing.T) {
                        body, _ := json.Marshal(&data)
                        request, _ := http.NewRequest("POST", "/_api/health", 
bytes.NewBuffer(body))
                        ctx := mock.GetMockHTTPContext(request)
-                       ctx.TargetResp = client.NewResponse([]byte(msg))
+                       ctx.TargetResp = client.NewByteResponse([]byte(msg))
                        err := factory.PrepareFilterChain(ctx, chain)
                        assert.Nil(t, err)
                        chain.OnDecode(ctx)
diff --git a/pkg/prometheus/prometheus.go b/pkg/prometheus/prometheus.go
index 058d4928..de00261b 100644
--- a/pkg/prometheus/prometheus.go
+++ b/pkg/prometheus/prometheus.go
@@ -29,7 +29,9 @@ import (
 
 import (
        "github.com/dubbo-go-pixiu/pixiu-api/pkg/context"
+
        "github.com/prometheus/client_golang/prometheus"
+
        "github.com/prometheus/common/expfmt"
 )
 
@@ -347,7 +349,7 @@ func (p *Prometheus) HandlerFunc() ContextHandlerFunc {
                if err1 == nil {
                        p.reqSz.WithLabelValues(statusStr, method, 
url).Observe(float64(reqSz))
                }
-               resSz, err2 := computeApproximateResponseSize(c.TargetResp)
+               resSz, err2 := 
computeApproximateResponseSize(c.TargetResp.(*client.UnaryResponse))
                if err2 == nil {
                        p.resSz.WithLabelValues(statusStr, method, 
url).Observe(float64(resSz))
                }
@@ -382,9 +384,9 @@ func computeApproximateRequestSize(r *http.Request) (int, 
error) {
        return s, nil
 }
 
-func computeApproximateResponseSize(res *client.Response) (int, error) {
+func computeApproximateResponseSize(res *client.UnaryResponse) (int, error) {
        if res == nil {
-               return 0, errors.New("client.Response is null pointer ")
+               return 0, errors.New("client.UnaryResponse is null pointer ")
        }
        return len(res.Data), nil
 }

Reply via email to