This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 96542304 [AI] feat: Compatible with http/sse protocol (#657)
96542304 is described below
commit 9654230426802411b6dc5247e0fce53ad1935eca
Author: Alan <[email protected]>
AuthorDate: Sun Apr 20 13:19:11 2025 +0800
[AI] feat: Compatible with http/sse protocol (#657)
* impl: Compatible with http/sse protocol
* impl: retry
* delete redundant files
* fix assert
* fix assert
* fix import
* change interface to client.Response
* fix comments
* merge two test files
* bug fix
* rename symbol & fix https router
* nil pointer fix
---
pkg/client/http/http.go | 9 ++
pkg/client/response.go | 36 ++++-
pkg/common/constant/http.go | 15 ++-
pkg/common/http/manager.go | 101 ++++++++++++--
pkg/common/http/manager_test.go | 146 ++++++++++++++++++++-
.../http/httpproxy => common/mock}/routerfilter.go | 79 ++++-------
pkg/common/util/response.go | 4 +-
pkg/context/http/context.go | 6 +-
pkg/filter/accesslog/access_log.go | 3 +-
pkg/filter/accesslog/access_log_test.go | 2 +-
pkg/filter/http/httpproxy/routerfilter.go | 32 ++---
pkg/filter/metric/metric.go | 10 +-
pkg/filter/prometheus/metric_test.go | 2 +-
pkg/prometheus/prometheus.go | 8 +-
14 files changed, 346 insertions(+), 107 deletions(-)
diff --git a/pkg/client/http/http.go b/pkg/client/http/http.go
index 875dd33e..8cd0c4aa 100644
--- a/pkg/client/http/http.go
+++ b/pkg/client/http/http.go
@@ -26,6 +26,7 @@ import (
import (
"github.com/pkg/errors"
+
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
@@ -107,6 +108,7 @@ func (dc *Client) Call(req *client.Request) (resp
interface{}, err error) {
newReq.Header = params.Header
httpClient := &http.Client{Timeout: req.Timeout}
+ // Observability
tr := otel.Tracer(traceNameHTTPClient)
_, span := tr.Start(req.Context, "HTTP "+newReq.Method,
trace.WithSpanKind(trace.SpanKindClient))
trace.SpanFromContext(req.Context).SpanContext()
@@ -116,6 +118,7 @@ func (dc *Client) Call(req *client.Request) (resp
interface{}, err error) {
newReq.Header.Set(jaegerTraceIDInHeader,
span.SpanContext().TraceID().String())
defer span.End()
+ // Real request
tmpRet, err := httpClient.Do(newReq)
if tmpRet != nil {
span.SetAttributes(semconv.HTTPStatusCodeKey.Int(tmpRet.StatusCode))
@@ -197,3 +200,9 @@ func (dc *Client) parseURL(req *client.Request, params
requestParams) (string, e
}
return parsedURL.String(), nil
}
+
+// IsSSEStream check if the response is a SSE stream
+func IsSSEStream(resp *http.Response) bool {
+ contentType := resp.Header.Get(constant.HeaderKeyContextType)
+ return strings.Contains(contentType,
constant.HeaderValueTextEventStream)
+}
diff --git a/pkg/client/response.go b/pkg/client/response.go
index cb461c76..fac5fd95 100644
--- a/pkg/client/response.go
+++ b/pkg/client/response.go
@@ -17,12 +17,38 @@
package client
-// Response response from endpoint
-type Response struct {
+import (
+ "io"
+)
+
+type Response interface {
+ IsStream() bool
+}
+
+// UnaryResponse response from endpoint
+type UnaryResponse struct {
Data []byte
}
-// NewResponse create response
-func NewResponse(data []byte) *Response {
- return &Response{Data: data}
+func (r *UnaryResponse) IsStream() bool {
+ return false
+}
+
+// NewByteResponse create response contains a []byte
+func NewByteResponse(data []byte) *UnaryResponse {
+ return &UnaryResponse{Data: data}
+}
+
+// StreamResponse response from endpoint
+type StreamResponse struct {
+ Stream io.ReadCloser
+}
+
+func (r *StreamResponse) IsStream() bool {
+ return true
+}
+
+// NewStreamResponse create response contains a stream
+func NewStreamResponse(stream io.ReadCloser) *StreamResponse {
+ return &StreamResponse{Stream: stream}
}
diff --git a/pkg/common/constant/http.go b/pkg/common/constant/http.go
index d76536cf..ac10c843 100644
--- a/pkg/common/constant/http.go
+++ b/pkg/common/constant/http.go
@@ -18,7 +18,9 @@
package constant
const (
- HeaderKeyContextType = "Content-Type"
+ HeaderKeyContextType = "Content-Type"
+ HeaderKeyCacheControl = "Cache-Control"
+ HeaderKeyConnection = "Connection"
HeaderKeyAccessControlAllowOrigin = "Access-Control-Allow-Origin"
HeaderKeyAccessControlAllowHeaders = "Access-Control-Allow-Headers"
@@ -29,8 +31,12 @@ const (
HeaderValueJsonUtf8 = "application/json;charset=UTF-8"
HeaderValueTextPlain = "text/plain"
+ HeaderValueTextEventStream = "text/event-stream"
HeaderValueApplicationJson = "application/json"
+ HeaderValueKeepAlive = "keep-alive"
+ HeaderValueNoCache = "no-cache"
+
HeaderValueAll = "*"
PathSlash = "/"
@@ -72,3 +78,10 @@ const (
Origin = "Origin"
XForwardedProto = "X-Forwarded-Proto"
)
+
+// SSE response prefixes
+const (
+ SSEData = "data"
+ SSEEvent = "event"
+ SSEId = "id"
+)
diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go
index 8b3ee7bd..f2b3d809 100644
--- a/pkg/common/http/manager.go
+++ b/pkg/common/http/manager.go
@@ -32,6 +32,7 @@ import (
import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
+ "github.com/apache/dubbo-go-pixiu/pkg/client/http"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router"
@@ -107,16 +108,87 @@ func (hcm *HttpConnectionManager) handleHTTPRequest(c
*pch.HttpContext) {
//todo timeout
filterChain.OnDecode(c)
hcm.buildTargetResponse(c)
+ //todo: stream resp has to set HTTP Server's WriteTimeout to 0, need to
check it
filterChain.OnEncode(c)
hcm.writeResponse(c)
}
func (hcm *HttpConnectionManager) writeResponse(c *pch.HttpContext) {
if !c.LocalReply() {
- writer := c.Writer
- writer.WriteHeader(c.GetStatusCode())
- if _, err := writer.Write(c.TargetResp.Data); err != nil {
- logger.Errorf("write response error: %s", err)
+ c.Writer.WriteHeader(c.GetStatusCode())
+ if c.TargetResp != nil {
+ switch res := c.TargetResp.(type) {
+ case *client.UnaryResponse:
+ _, err := c.Writer.Write(res.Data)
+ if err != nil {
+ logger.Errorf("Write response failed:
%v", err)
+ }
+ case *client.StreamResponse:
+ // create ctx helps goroutine exit
+ ctx, cancel := context.WithCancel(c.Ctx)
+ defer cancel()
+
+ dataC := make(chan []byte)
+ errC := make(chan error, 1)
+
+ // goroutine read stream
+ go func() {
+ defer close(dataC)
+ defer close(errC)
+ buf := make([]byte, 1024) // 1KB buffer
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ n, err :=
res.Stream.Read(buf)
+ if n > 0 {
+ // copy data to
prevent data cover
+ data :=
make([]byte, n)
+ copy(data,
buf[:n])
+ select {
+ case dataC <-
data:
+ case
<-ctx.Done():
+ return
+ }
+ }
+ if err != nil {
+ if err !=
io.EOF {
+ errC <-
fmt.Errorf("stream read error: %w", err)
+ } else {
+ errC <-
io.EOF
+ }
+ return
+ }
+ }
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ _ = res.Stream.Close()
+ return
+ case data, ok := <-dataC:
+ if !ok {
+ return
+ }
+ if _, err :=
c.Writer.Write(data); err != nil {
+ cancel()
+ _ = res.Stream.Close()
+ return
+ }
+ case err := <-errC:
+ if err != nil && err != io.EOF {
+ logger.Errorf("Stream
error: %v", err)
+ }
+ return
+ }
+ }
+ default:
+ logger.Errorf("Unknown response type: %T",
c.TargetResp)
+ }
+
}
}
}
@@ -128,12 +200,6 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c
*pch.HttpContext) {
switch res := c.SourceResp.(type) {
case *stdHttp.Response:
- body, err := io.ReadAll(res.Body)
- if err != nil {
- panic(err)
- }
- //close body
- _ = res.Body.Close()
//Merge header
remoteHeader := res.Header
for k := range remoteHeader {
@@ -141,7 +207,18 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c
*pch.HttpContext) {
}
//status code
c.StatusCode(res.StatusCode)
- c.TargetResp = &client.Response{Data: body}
+
+ if http.IsSSEStream(res) {
+ c.TargetResp = &client.StreamResponse{Stream: res.Body}
+ } else {
+ body, err := io.ReadAll(res.Body)
+ if err != nil {
+ panic(err)
+ }
+ //close body
+ _ = res.Body.Close()
+ c.TargetResp = &client.UnaryResponse{Data: body}
+ }
case []byte:
c.StatusCode(stdHttp.StatusOK)
if json.Valid(res) {
@@ -149,7 +226,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c
*pch.HttpContext) {
} else {
c.AddHeader(constant.HeaderKeyContextType,
constant.HeaderValueTextPlain)
}
- c.TargetResp = &client.Response{Data: res}
+ c.TargetResp = &client.UnaryResponse{Data: res}
default:
//dubbo go generic invoke
response := util.NewDubboResponse(res, false)
diff --git a/pkg/common/http/manager_test.go b/pkg/common/http/manager_test.go
index 684dd2dd..450546a5 100644
--- a/pkg/common/http/manager_test.go
+++ b/pkg/common/http/manager_test.go
@@ -19,9 +19,14 @@ package http
import (
"bytes"
+ "context"
"fmt"
+ "net"
"net/http"
+ "net/http/httptest"
+ "strings"
"testing"
+ "time"
)
import (
@@ -30,6 +35,7 @@ import (
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ commonmock "github.com/apache/dubbo-go-pixiu/pkg/common/mock"
"github.com/apache/dubbo-go-pixiu/pkg/common/router/trie"
contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/context/mock"
@@ -43,6 +49,10 @@ const (
Kind = DEMO
)
+var (
+ eventCh = make(chan string, 3)
+)
+
type (
// Plugin is http filter plugin.
Plugin struct {
@@ -89,10 +99,10 @@ func (f *DemoFilter) Encode(ctx *contexthttp.HttpContext)
filter.FilterStatus {
func (f *DemoFilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext,
chain filter.FilterChain) error {
c := f.conf
str := fmt.Sprintf("%s is drinking in the %s", c.Foo, c.Bar)
- filter := &DemoFilter{str: str}
+ demoFilter := &DemoFilter{str: str}
- chain.AppendDecodeFilters(filter)
- chain.AppendEncodeFilters(filter)
+ chain.AppendDecodeFilters(demoFilter)
+ chain.AppendEncodeFilters(demoFilter)
return nil
}
@@ -131,7 +141,7 @@ func TestCreateHttpConnectionManager(t *testing.T) {
request, err := http.NewRequest("POST",
"http://www.dubbogopixiu.com/api/v1?name=tc",
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
assert.NoError(t, err)
request.Header = map[string][]string{
- "X-Dgp-Way": []string{"Dubbo"},
+ "X-Dgp-Way": {"Dubbo"},
}
assert.NoError(t, err)
c := mock.GetMockHTTPContext(request)
@@ -140,3 +150,131 @@ func TestCreateHttpConnectionManager(t *testing.T) {
err = hcm.Handle(c)
assert.NoError(t, err)
}
+
+// test SSE case
+func TestStreamingResponse(t *testing.T) {
+ hcmc := model.HttpConnectionManagerConfig{
+ RouteConfig: model.RouteConfiguration{
+ RouteTrie: trie.NewTrieWithDefault("GET/api/sse",
model.RouteAction{
+ Cluster: "mock_stream_cluster",
+ }),
+ },
+ HTTPFilters: []*model.HTTPFilter{
+ {
+ Name: commonmock.Kind,
+ },
+ },
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer cancel()
+
+ // mock server
+ upstreamServer, _ := NewTestServerWithURL("localhost:8080",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ flusher := w.(http.Flusher)
+
+ for i := 1; i <= 3; i++ {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ time.Sleep(10 * time.Millisecond)
+ event := fmt.Sprintf("data: %d\nevent: %d\nid:
%d\n\n", i, i, i)
+ _, _ = w.Write([]byte(event))
+ flusher.Flush()
+ logger.Info("Upstream sent event ", i)
+ }
+ }
+ }))
+ defer upstreamServer.Close()
+
+ req := httptest.NewRequest("GET", "http://localhost:8080/api/sse",
nil).WithContext(ctx)
+
+ done := make(chan struct{})
+
+ httpCtx := &contexthttp.HttpContext{
+ Request: req,
+ Writer: NewStreamRecorder(),
+ Ctx: ctx,
+ }
+ go func() {
+ defer close(done)
+
+ hcm := CreateHttpConnectionManager(&hcmc)
+
+ if err := hcm.Handle(httpCtx); err != nil {
+ t.Errorf("Handle failed: %v", err)
+ }
+
+ // test targetResp
+ if httpCtx.TargetResp == nil {
+ t.Error("TargetResp is nil")
+ return
+ }
+ }()
+
+ // event waiting test
+ for {
+ receivedEvents := httpCtx.Writer.(*StreamRecorder).receivedBuf
+ select {
+ case event := <-eventCh:
+ logger.Info("Received event: %s",
strings.ReplaceAll(event, "\n", "\\n"))
+ case <-done:
+ assert.Equal(t, 3, len(receivedEvents), "Should receive
3 events")
+ return
+ case <-time.After(5 * time.Second):
+ t.Fatal("Test timeout")
+ return
+ }
+ }
+
+}
+
+// mock recorder
+type StreamRecorder struct {
+ http.ResponseWriter
+ http.Flusher
+ receivedBuf []string
+ headers http.Header
+ status int
+}
+
+func NewStreamRecorder() *StreamRecorder {
+ return &StreamRecorder{
+ receivedBuf: make([]string, 0),
+ headers: make(http.Header),
+ }
+}
+
+func (r *StreamRecorder) Header() http.Header {
+ return r.headers
+}
+
+func (r *StreamRecorder) WriteHeader(statusCode int) {
+ r.status = statusCode
+}
+
+func (r *StreamRecorder) Write(data []byte) (int, error) {
+ eventCh <- string(data)
+ r.receivedBuf = append(r.receivedBuf, string(data))
+ return len(data), nil
+}
+
+func NewTestServerWithURL(URL string, handler http.Handler) (*httptest.Server,
error) {
+ ts := httptest.NewUnstartedServer(handler)
+ if URL != "" {
+ l, err := net.Listen("tcp", URL)
+ if err != nil {
+ return nil, err
+ }
+ err = ts.Listener.Close()
+ if err != nil {
+ return nil, err
+ }
+ ts.Listener = l
+ }
+ ts.Start()
+ return ts, nil
+}
diff --git a/pkg/filter/http/httpproxy/routerfilter.go
b/pkg/common/mock/routerfilter.go
similarity index 54%
copy from pkg/filter/http/httpproxy/routerfilter.go
copy to pkg/common/mock/routerfilter.go
index d47cfdaf..046ccd13 100644
--- a/pkg/filter/http/httpproxy/routerfilter.go
+++ b/pkg/common/mock/routerfilter.go
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package httpproxy
+package mock
import (
"encoding/json"
"fmt"
- stdhttp "net/http"
- "net/url"
+ "net/http"
"time"
)
import (
- "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
- "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
- "github.com/apache/dubbo-go-pixiu/pkg/server"
)
const (
// Kind is the kind of Fallback.
- Kind = constant.HTTPProxyFilter
+ Kind = "dgp.filter.http.sse.httpproxy"
)
func init() {
@@ -49,11 +46,11 @@ type (
// FilterFactory is http filter instance
FilterFactory struct {
cfg *Config
- client stdhttp.Client
+ client http.Client
}
//Filter
Filter struct {
- client stdhttp.Client
+ client http.Client
}
// Config describe the config of FilterFactory
Config struct {
@@ -72,83 +69,57 @@ func (p *Plugin) CreateFilterFactory()
(filter.HttpFilterFactory, error) {
return &FilterFactory{cfg: &Config{}}, nil
}
-func (factory *FilterFactory) Config() interface{} {
- return factory.cfg
+func (ff *FilterFactory) Config() interface{} {
+ return ff.cfg
}
-func (factory *FilterFactory) Apply() error {
- cfg := factory.cfg
- client := stdhttp.Client{
+func (ff *FilterFactory) Apply() error {
+ cfg := ff.cfg
+ client := http.Client{
Timeout: cfg.Timeout,
- Transport: stdhttp.RoundTripper(&stdhttp.Transport{
+ Transport: http.RoundTripper(&http.Transport{
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.MaxConnsPerHost,
}),
}
- factory.client = client
+ ff.client = client
return nil
}
-func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain
filter.FilterChain) error {
+func (ff *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext,
chain filter.FilterChain) error {
//reuse http client
- f := &Filter{factory.client}
+ f := &Filter{ff.client}
chain.AppendDecodeFilters(f)
return nil
}
-func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
- rEntry := hc.GetRouteEntry()
- if rEntry == nil {
- panic("no route entry")
- }
- logger.Debugf("[dubbo-go-pixiu] client choose endpoint from cluster
:%v", rEntry.Cluster)
-
- clusterName := rEntry.Cluster
- clusterManager := server.GetClusterManager()
- endpoint := clusterManager.PickEndpoint(clusterName, hc)
- if endpoint == nil {
- logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
- bt, _ := json.Marshal(http.ErrResponse{Message: "cluster not
found endpoint"})
- hc.SendLocalReply(stdhttp.StatusServiceUnavailable, bt)
- return filter.Stop
- }
-
- logger.Debugf("[dubbo-go-pixiu] client choose endpoint :%v",
endpoint.Address.GetAddress())
+func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus {
r := hc.Request
var (
- req *stdhttp.Request
+ req *http.Request
err error
)
- parsedURL := url.URL{
- Host: endpoint.Address.GetAddress(),
- Scheme: "http",
- Path: r.URL.Path,
- RawQuery: r.URL.RawQuery,
- }
-
- req, err = stdhttp.NewRequest(r.Method, parsedURL.String(), r.Body)
+ req, err = http.NewRequest(r.Method, r.URL.String(), r.Body)
if err != nil {
- bt, _ := json.Marshal(http.ErrResponse{Message:
fmt.Sprintf("BUG: new request failed: %v", err)})
- hc.SendLocalReply(stdhttp.StatusInternalServerError, bt)
+ bt, _ := json.Marshal(contexthttp.ErrResponse{Message:
fmt.Sprintf("BUG: new request failed: %v", err)})
+ hc.SendLocalReply(http.StatusInternalServerError, bt)
return filter.Stop
}
req.Header = r.Header
resp, err := f.client.Do(req)
+
if err != nil {
- urlErr, ok := err.(*url.Error)
- if ok && urlErr.Timeout() {
- hc.SendLocalReply(stdhttp.StatusGatewayTimeout,
[]byte(err.Error()))
- return filter.Stop
- }
- hc.SendLocalReply(stdhttp.StatusServiceUnavailable,
[]byte(err.Error()))
+ hc.SendLocalReply(http.StatusServiceUnavailable,
[]byte(err.Error()))
return filter.Stop
}
- logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
+
hc.SourceResp = resp
+
+ logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
// response write in hcm
return filter.Continue
}
diff --git a/pkg/common/util/response.go b/pkg/common/util/response.go
index e8d406d0..5d4403a3 100644
--- a/pkg/common/util/response.go
+++ b/pkg/common/util/response.go
@@ -28,10 +28,10 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
)
-func NewDubboResponse(data interface{}, hump bool) *client.Response {
+func NewDubboResponse(data interface{}, hump bool) *client.UnaryResponse {
r, _ := dealResp(data, hump)
bytes, _ := json.Marshal(r)
- return &client.Response{Data: bytes}
+ return &client.UnaryResponse{Data: bytes}
}
func dealResp(in interface{}, HumpToLine bool) (interface{}, error) {
diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go
index 56c8b246..ec6d1378 100644
--- a/pkg/context/http/context.go
+++ b/pkg/context/http/context.go
@@ -59,7 +59,7 @@ type HttpContext struct {
// localReplyBody: happen error
localReplyBody []byte
// the response context will return.
- TargetResp *client.Response
+ TargetResp client.Response
// client call response.
SourceResp interface{}
@@ -171,13 +171,13 @@ func (hc *HttpContext) GetApplicationName() string {
return ""
}
-// SendLocalReply Means that the request was interrupted and Response will be
sent directly
+// SendLocalReply Means that the request was interrupted and UnaryResponse
will be sent directly
// Even if it’s currently in to Decode stage
func (hc *HttpContext) SendLocalReply(status int, body []byte) {
hc.localReply = true
hc.statusCode = status
hc.localReplyBody = body
- hc.TargetResp = &client.Response{Data: body}
+ hc.TargetResp = &client.UnaryResponse{Data: body}
if json.Valid(body) {
hc.AddHeader(constant.HeaderKeyContextType,
constant.HeaderValueApplicationJson)
} else {
diff --git a/pkg/filter/accesslog/access_log.go
b/pkg/filter/accesslog/access_log.go
index 5cc1bedb..938dca2a 100644
--- a/pkg/filter/accesslog/access_log.go
+++ b/pkg/filter/accesslog/access_log.go
@@ -27,6 +27,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pkg/client"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pkg/context/http"
@@ -137,7 +138,7 @@ func buildAccessLogMsg(c *http.HttpContext, cost
time.Duration) string {
builder.WriteString(fmt.Sprintf("invoke err [ %v", err))
builder.WriteString("] ")
}
- resp := c.TargetResp.Data
+ resp := c.TargetResp.(*client.UnaryResponse).Data
if err != nil {
builder.WriteString(" response can not convert to string")
builder.WriteString("] ")
diff --git a/pkg/filter/accesslog/access_log_test.go
b/pkg/filter/accesslog/access_log_test.go
index 6d95b2c2..1fa4164e 100644
--- a/pkg/filter/accesslog/access_log_test.go
+++ b/pkg/filter/accesslog/access_log_test.go
@@ -64,7 +64,7 @@ func TestApply(t *testing.T) {
request, _ := http.NewRequest("POST",
"http://www.dubbogopixiu.com/mock/test?name=tc",
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
ctx := mock.GetMockHTTPContext(request)
- ctx.TargetResp = client.NewResponse([]byte(msg))
+ ctx.TargetResp = client.NewByteResponse([]byte(msg))
f.Decode(ctx)
f.Encode(ctx)
diff --git a/pkg/filter/http/httpproxy/routerfilter.go
b/pkg/filter/http/httpproxy/routerfilter.go
index d47cfdaf..bc49ae96 100644
--- a/pkg/filter/http/httpproxy/routerfilter.go
+++ b/pkg/filter/http/httpproxy/routerfilter.go
@@ -20,7 +20,7 @@ package httpproxy
import (
"encoding/json"
"fmt"
- stdhttp "net/http"
+ "net/http"
"net/url"
"time"
)
@@ -28,7 +28,7 @@ import (
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
- "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/server"
)
@@ -49,11 +49,11 @@ type (
// FilterFactory is http filter instance
FilterFactory struct {
cfg *Config
- client stdhttp.Client
+ client http.Client
}
//Filter
Filter struct {
- client stdhttp.Client
+ client http.Client
}
// Config describe the config of FilterFactory
Config struct {
@@ -78,9 +78,9 @@ func (factory *FilterFactory) Config() interface{} {
func (factory *FilterFactory) Apply() error {
cfg := factory.cfg
- client := stdhttp.Client{
+ client := http.Client{
Timeout: cfg.Timeout,
- Transport: stdhttp.RoundTripper(&stdhttp.Transport{
+ Transport: http.RoundTripper(&http.Transport{
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.MaxConnsPerHost,
@@ -90,14 +90,14 @@ func (factory *FilterFactory) Apply() error {
return nil
}
-func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain
filter.FilterChain) error {
+func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext,
chain filter.FilterChain) error {
//reuse http client
f := &Filter{factory.client}
chain.AppendDecodeFilters(f)
return nil
}
-func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
+func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus {
rEntry := hc.GetRouteEntry()
if rEntry == nil {
panic("no route entry")
@@ -109,8 +109,8 @@ func (f *Filter) Decode(hc *http.HttpContext)
filter.FilterStatus {
endpoint := clusterManager.PickEndpoint(clusterName, hc)
if endpoint == nil {
logger.Debugf("[dubbo-go-pixiu] cluster not found endpoint")
- bt, _ := json.Marshal(http.ErrResponse{Message: "cluster not
found endpoint"})
- hc.SendLocalReply(stdhttp.StatusServiceUnavailable, bt)
+ bt, _ := json.Marshal(contexthttp.ErrResponse{Message: "cluster
not found endpoint"})
+ hc.SendLocalReply(http.StatusServiceUnavailable, bt)
return filter.Stop
}
@@ -118,7 +118,7 @@ func (f *Filter) Decode(hc *http.HttpContext)
filter.FilterStatus {
r := hc.Request
var (
- req *stdhttp.Request
+ req *http.Request
err error
)
@@ -129,10 +129,10 @@ func (f *Filter) Decode(hc *http.HttpContext)
filter.FilterStatus {
RawQuery: r.URL.RawQuery,
}
- req, err = stdhttp.NewRequest(r.Method, parsedURL.String(), r.Body)
+ req, err = http.NewRequest(r.Method, parsedURL.String(), r.Body)
if err != nil {
- bt, _ := json.Marshal(http.ErrResponse{Message:
fmt.Sprintf("BUG: new request failed: %v", err)})
- hc.SendLocalReply(stdhttp.StatusInternalServerError, bt)
+ bt, _ := json.Marshal(contexthttp.ErrResponse{Message:
fmt.Sprintf("BUG: new request failed: %v", err)})
+ hc.SendLocalReply(http.StatusInternalServerError, bt)
return filter.Stop
}
req.Header = r.Header
@@ -141,10 +141,10 @@ func (f *Filter) Decode(hc *http.HttpContext)
filter.FilterStatus {
if err != nil {
urlErr, ok := err.(*url.Error)
if ok && urlErr.Timeout() {
- hc.SendLocalReply(stdhttp.StatusGatewayTimeout,
[]byte(err.Error()))
+ hc.SendLocalReply(http.StatusGatewayTimeout,
[]byte(err.Error()))
return filter.Stop
}
- hc.SendLocalReply(stdhttp.StatusServiceUnavailable,
[]byte(err.Error()))
+ hc.SendLocalReply(http.StatusServiceUnavailable,
[]byte(err.Error()))
return filter.Stop
}
logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
diff --git a/pkg/filter/metric/metric.go b/pkg/filter/metric/metric.go
index 044c4f06..fc4bba0a 100644
--- a/pkg/filter/metric/metric.go
+++ b/pkg/filter/metric/metric.go
@@ -25,7 +25,9 @@ import (
import (
"github.com/pkg/errors"
+
"go.opentelemetry.io/otel/attribute"
+
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
@@ -58,7 +60,7 @@ func init() {
}
type (
- // FilterFactory is http filter plugin.
+ // Plugin is http filter plugin.
Plugin struct {
}
// FilterFactory is http filter instance
@@ -137,11 +139,11 @@ func (f *Filter) Encode(c *http.HttpContext)
filter.FilterStatus {
return filter.Continue
}
-func computeApproximateResponseSize(res *client.Response) (int, error) {
+func computeApproximateResponseSize(res client.Response) (int, error) {
if res == nil {
- return 0, errors.New("client.Response is null pointer ")
+ return 0, errors.New("client.UnaryResponse is null pointer ")
}
- return len(res.Data), nil
+ return len(res.(*client.UnaryResponse).Data), nil
}
func computeApproximateRequestSize(r *stdhttp.Request) (int, error) {
diff --git a/pkg/filter/prometheus/metric_test.go
b/pkg/filter/prometheus/metric_test.go
index 7f39c547..ab7eb65f 100644
--- a/pkg/filter/prometheus/metric_test.go
+++ b/pkg/filter/prometheus/metric_test.go
@@ -62,7 +62,7 @@ func TestCounterExporterApiMetric(t *testing.T) {
body, _ := json.Marshal(&data)
request, _ := http.NewRequest("POST", "/_api/health",
bytes.NewBuffer(body))
ctx := mock.GetMockHTTPContext(request)
- ctx.TargetResp = client.NewResponse([]byte(msg))
+ ctx.TargetResp = client.NewByteResponse([]byte(msg))
err := factory.PrepareFilterChain(ctx, chain)
assert.Nil(t, err)
chain.OnDecode(ctx)
diff --git a/pkg/prometheus/prometheus.go b/pkg/prometheus/prometheus.go
index 058d4928..de00261b 100644
--- a/pkg/prometheus/prometheus.go
+++ b/pkg/prometheus/prometheus.go
@@ -29,7 +29,9 @@ import (
import (
"github.com/dubbo-go-pixiu/pixiu-api/pkg/context"
+
"github.com/prometheus/client_golang/prometheus"
+
"github.com/prometheus/common/expfmt"
)
@@ -347,7 +349,7 @@ func (p *Prometheus) HandlerFunc() ContextHandlerFunc {
if err1 == nil {
p.reqSz.WithLabelValues(statusStr, method,
url).Observe(float64(reqSz))
}
- resSz, err2 := computeApproximateResponseSize(c.TargetResp)
+ resSz, err2 :=
computeApproximateResponseSize(c.TargetResp.(*client.UnaryResponse))
if err2 == nil {
p.resSz.WithLabelValues(statusStr, method,
url).Observe(float64(resSz))
}
@@ -382,9 +384,9 @@ func computeApproximateRequestSize(r *http.Request) (int,
error) {
return s, nil
}
-func computeApproximateResponseSize(res *client.Response) (int, error) {
+func computeApproximateResponseSize(res *client.UnaryResponse) (int, error) {
if res == nil {
- return 0, errors.New("client.Response is null pointer ")
+ return 0, errors.New("client.UnaryResponse is null pointer ")
}
return len(res.Data), nil
}