This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 566cd766 Add streamable http (#674)
566cd766 is described below
commit 566cd7667bd89ff82610b6e72b9d9dbbab30aff6
Author: ZeruiYang <[email protected]>
AuthorDate: Fri May 30 23:33:35 2025 -0700
Add streamable http (#674)
* impl: Compatible with http/sse protocol
* impl: retry
* delete redundant files
* fix assert
* fix assert
* fix import
* feat: add support for streamable HTTP responses and enhance response
handling
* feat: enhance streamable response detection and add related tests
* feat: refactor response handling to use UnaryResponse and clean up
StreamResponse struct
* feat: add StreamHTTPRecorder and tests for streamable HTTP responses
* feat: improve HTTP listener error handling
* fix: update timeout values in configuration to string format
* feat: implement custom caller encoder for padded file paths in logger
* fix: initialize logController in init function for default config
* refactor: replace hardcoded content type values with constants in manager
tests
* refactor: update response handling to use new response constructors
* refactor: standardize header constant formatting in http.go
* feat: add Flush method to StreamRecorder for improved stream handling
* feat: add image/jpeg header constant and update test case
* refactor: rename logger to pixiuLogger and update related methods
* fix: delete unuse interface
---------
Co-authored-by: Alanxtl <[email protected]>
---
configs/conf.yaml | 6 +-
pkg/client/http/http.go | 50 ++++++-
pkg/client/response.go | 23 ++--
pkg/common/constant/http.go | 21 ++-
pkg/common/http/manager.go | 14 +-
pkg/common/http/manager_test.go | 228 +++++++++++++++++++++++++++++++-
pkg/context/http/context.go | 2 +-
pkg/filter/accesslog/access_log_test.go | 2 +-
pkg/filter/metric/metric.go | 4 +-
pkg/filter/prometheus/metric_test.go | 2 +-
pkg/listener/http/http_listener.go | 17 ++-
pkg/logger/controller.go | 6 +-
pkg/logger/logger.go | 29 +++-
pkg/model/http.go | 8 +-
pkg/prometheus/prometheus.go | 2 -
15 files changed, 355 insertions(+), 59 deletions(-)
diff --git a/configs/conf.yaml b/configs/conf.yaml
index 3f4f4a3a..5cfca4d3 100644
--- a/configs/conf.yaml
+++ b/configs/conf.yaml
@@ -50,9 +50,9 @@ static_resources:
allow_credentials: false
config:
- idle_timeout: 5s
- read_timeout: 5s
- write_timeout: 5s
+ idle_timeout: "5s"
+ read_timeout: "5s"
+ write_timeout: "5s"
clusters:
- name: "user"
lb_policy: "lb"
diff --git a/pkg/client/http/http.go b/pkg/client/http/http.go
index c5e2dd12..3a8efce0 100644
--- a/pkg/client/http/http.go
+++ b/pkg/client/http/http.go
@@ -20,13 +20,13 @@ package http
import (
"net/http"
"net/url"
+ "strconv"
"strings"
"sync"
)
import (
"github.com/pkg/errors"
-
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
@@ -206,3 +206,51 @@ func IsSSEStream(resp *http.Response) bool {
contentType := resp.Header.Get(constant.HeaderKeyContextType)
return strings.Contains(contentType,
constant.HeaderValueTextEventStream)
}
+
+// IsStreamableResponse check if the response is streamable
+// Determine whether it is a streaming response based on the following
conditions:
+// 1. Using Transfer-Encoding: chunked
+// 2. The content type indicates that this is a streamable response (e.g.
text/event-stream, application/json, etc.)
+func IsStreamableResponse(resp *http.Response) bool {
+ if IsSSEStream(resp) {
+ return true
+ }
+
+ // check if the block encoded transfer is used
+ transferEncoding := resp.Header.Get(constant.HeaderKeyTransferEncoding)
+ if strings.Contains(strings.ToLower(transferEncoding),
constant.HeaderValueChunked) {
+ return true
+ }
+
+ // check the content type
+ contentType := resp.Header.Get(constant.HeaderKeyContextType)
+
+ // check if it s a streamable content type
+ streamableTypes := []string{
+ constant.HeaderValueTextPrefix,
+ constant.HeaderValueApplicationJson,
+ constant.HeaderValueApplicationNDJson,
+ constant.HeaderValueApplicationOctetStream,
+ }
+
+ for _, streamableType := range streamableTypes {
+ if strings.HasPrefix(contentType, streamableType) {
+ // For these content types, if you don't have
Content-Length set or if Content-Length is large,
+ // may be a good candidate for streaming
+ contentLength :=
resp.Header.Get(constant.HeaderKeyContentLength)
+
+ // If Content-Length is not specified, it is possible
that the server is not aware of the content length
+ if contentLength == "" {
+ return true
+ }
+
+ // If the Content-Length is large (> 1MB), streaming is
also suitable
+ length, err := strconv.ParseInt(contentLength, 10, 64)
+ if err == nil && length > 1024*1024 {
+ return true
+ }
+ }
+ }
+
+ return false
+}
diff --git a/pkg/client/response.go b/pkg/client/response.go
index fac5fd95..f9b6a81e 100644
--- a/pkg/client/response.go
+++ b/pkg/client/response.go
@@ -21,34 +21,27 @@ import (
"io"
)
-type Response interface {
- IsStream() bool
-}
-
// UnaryResponse response from endpoint
type UnaryResponse struct {
Data []byte
}
-func (r *UnaryResponse) IsStream() bool {
- return false
-}
-
-// NewByteResponse create response contains a []byte
-func NewByteResponse(data []byte) *UnaryResponse {
+// NewUnaryResponse create response contains a []byte
+func NewUnaryResponse(data []byte) *UnaryResponse {
return &UnaryResponse{Data: data}
}
// StreamResponse response from endpoint
type StreamResponse struct {
- Stream io.ReadCloser
+ Stream io.ReadCloser
+ IsSSEStream bool
}
-func (r *StreamResponse) IsStream() bool {
- return true
+func (r *StreamResponse) IsSSE() bool {
+ return r.IsSSEStream
}
// NewStreamResponse create response contains a stream
-func NewStreamResponse(stream io.ReadCloser) *StreamResponse {
- return &StreamResponse{Stream: stream}
+func NewStreamResponse(stream io.ReadCloser, isSSE bool) *StreamResponse {
+ return &StreamResponse{Stream: stream, IsSSEStream: isSSE}
}
diff --git a/pkg/common/constant/http.go b/pkg/common/constant/http.go
index ac10c843..b76dfeb0 100644
--- a/pkg/common/constant/http.go
+++ b/pkg/common/constant/http.go
@@ -18,9 +18,11 @@
package constant
const (
- HeaderKeyContextType = "Content-Type"
- HeaderKeyCacheControl = "Cache-Control"
- HeaderKeyConnection = "Connection"
+ HeaderKeyContextType = "Content-Type"
+ HeaderKeyCacheControl = "Cache-Control"
+ HeaderKeyConnection = "Connection"
+ HeaderKeyTransferEncoding = "Transfer-Encoding"
+ HeaderKeyContentLength = "Content-Length"
HeaderKeyAccessControlAllowOrigin = "Access-Control-Allow-Origin"
HeaderKeyAccessControlAllowHeaders = "Access-Control-Allow-Headers"
@@ -29,10 +31,15 @@ const (
HeaderKeyAccessControlMaxAge = "Access-Control-Max-Age"
HeaderKeyAccessControlAllowCredentials =
"Access-Control-Allow-Credentials"
- HeaderValueJsonUtf8 = "application/json;charset=UTF-8"
- HeaderValueTextPlain = "text/plain"
- HeaderValueTextEventStream = "text/event-stream"
- HeaderValueApplicationJson = "application/json"
+ HeaderValueJsonUtf8 = "application/json;charset=UTF-8"
+ HeaderValueTextPlain = "text/plain"
+ HeaderValueTextEventStream = "text/event-stream"
+ HeaderValueApplicationJson = "application/json"
+ HeaderValueApplicationOctetStream = "application/octet-stream"
+ HeaderValueApplicationNDJson = "application/x-ndjson"
+ HeaderValueImageJpeg = "image/jpeg"
+ HeaderValueChunked = "chunked"
+ HeaderValueTextPrefix = "text/"
HeaderValueKeepAlive = "keep-alive"
HeaderValueNoCache = "no-cache"
diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go
index 703e4b1c..ee9a5833 100644
--- a/pkg/common/http/manager.go
+++ b/pkg/common/http/manager.go
@@ -179,6 +179,11 @@ func (hcm *HttpConnectionManager) writeResponse(c
*pch.HttpContext) {
_ = res.Stream.Close()
return
}
+
+ if flusher, ok :=
c.Writer.(stdHttp.Flusher); ok {
+ flusher.Flush()
+ }
+
case err := <-errC:
if err != nil && err != io.EOF {
logger.Errorf("Stream
error: %v", err)
@@ -190,7 +195,6 @@ func (hcm *HttpConnectionManager) writeResponse(c
*pch.HttpContext) {
default:
logger.Errorf("Unknown response type: %T",
c.TargetResp)
}
-
}
}
}
@@ -210,8 +214,8 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c
*pch.HttpContext) {
//status code
c.StatusCode(res.StatusCode)
- if http.IsSSEStream(res) {
- c.TargetResp = &client.StreamResponse{Stream: res.Body}
+ if http.IsStreamableResponse(res) {
+ c.TargetResp = client.NewStreamResponse(res.Body,
http.IsSSEStream(res))
} else {
body, err := io.ReadAll(res.Body)
if err != nil {
@@ -219,7 +223,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c
*pch.HttpContext) {
}
//close body
_ = res.Body.Close()
- c.TargetResp = &client.UnaryResponse{Data: body}
+ c.TargetResp = client.NewUnaryResponse(body)
}
case []byte:
c.StatusCode(stdHttp.StatusOK)
@@ -228,7 +232,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c
*pch.HttpContext) {
} else {
c.AddHeader(constant.HeaderKeyContextType,
constant.HeaderValueTextPlain)
}
- c.TargetResp = &client.UnaryResponse{Data: res}
+ c.TargetResp = client.NewUnaryResponse(res)
default:
//dubbo go generic invoke
response := util.NewDubboResponse(res, false)
diff --git a/pkg/common/http/manager_test.go b/pkg/common/http/manager_test.go
index 480272ef..292fd752 100644
--- a/pkg/common/http/manager_test.go
+++ b/pkg/common/http/manager_test.go
@@ -34,6 +34,8 @@ import (
)
import (
+ clienthttp "github.com/apache/dubbo-go-pixiu/pkg/client/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
commonmock "github.com/apache/dubbo-go-pixiu/pkg/common/mock"
"github.com/apache/dubbo-go-pixiu/pkg/common/router/trie"
@@ -171,8 +173,8 @@ func TestStreamingResponse(t *testing.T) {
// mock server
upstreamServer, _ := NewTestServerWithURL("localhost:8080",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set(constant.HeaderKeyContextType,
constant.HeaderValueTextEventStream)
+ w.Header().Set(constant.HeaderKeyCacheControl,
constant.HeaderValueNoCache)
flusher := w.(http.Flusher)
for i := 1; i <= 3; i++ {
@@ -229,7 +231,6 @@ func TestStreamingResponse(t *testing.T) {
return
}
}
-
}
// mock recorder
@@ -262,6 +263,9 @@ func (r *StreamRecorder) Write(data []byte) (int, error) {
return len(data), nil
}
+func (r *StreamRecorder) Flush() {
+}
+
func NewTestServerWithURL(URL string, handler http.Handler) (*httptest.Server,
error) {
ts := httptest.NewUnstartedServer(handler)
if URL != "" {
@@ -278,3 +282,221 @@ func NewTestServerWithURL(URL string, handler
http.Handler) (*httptest.Server, e
ts.Start()
return ts, nil
}
+
+// StreamHTTPRecorder Used to capture and test streaming HTTP responses over
channels
+type StreamHTTPRecorder struct {
+ http.ResponseWriter
+ receivedBuf []string
+ headers http.Header
+ status int
+ flushCount int
+}
+
+func NewStreamHTTPRecorder() *StreamHTTPRecorder {
+ return &StreamHTTPRecorder{
+ receivedBuf: make([]string, 0),
+ headers: make(http.Header),
+ flushCount: 0,
+ }
+}
+
+func (r *StreamHTTPRecorder) Header() http.Header {
+ return r.headers
+}
+
+func (r *StreamHTTPRecorder) WriteHeader(statusCode int) {
+ r.status = statusCode
+}
+
+func (r *StreamHTTPRecorder) Write(data []byte) (int, error) {
+ eventCh <- string(data)
+ r.receivedBuf = append(r.receivedBuf, string(data))
+ return len(data), nil
+}
+
+func (r *StreamHTTPRecorder) Flush() {
+ r.flushCount++
+}
+
+// Test a variety of common streaming HTTP response types
+func TestStreamableHTTPResponse(t *testing.T) {
+ // define the type of content you want to test
+ contentTypes := []string{
+ constant.HeaderValueTextPlain,
+ constant.HeaderValueApplicationJson,
+ constant.HeaderValueApplicationOctetStream,
+ constant.HeaderValueApplicationNDJson,
+ }
+
+ for _, contentType := range contentTypes {
+ t.Run(fmt.Sprintf("ContentType_%s", contentType), func(t
*testing.T) {
+ testStreamableResponse(t, contentType)
+ })
+ }
+}
+
+func testStreamableResponse(t *testing.T, contentType string) {
+ hcmc := model.HttpConnectionManagerConfig{
+ RouteConfig: model.RouteConfiguration{
+ RouteTrie: trie.NewTrieWithDefault("GET/api/stream",
model.RouteAction{
+ Cluster: "mock_stream_cluster",
+ }),
+ },
+ HTTPFilters: []*model.HTTPFilter{
+ {
+ Name: commonmock.Kind,
+ },
+ },
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer cancel()
+
+ // Clear any data that may have been left over from the previous test
+ for len(eventCh) > 0 {
+ <-eventCh
+ }
+
+ // mock server
+ upstreamServer, _ := NewTestServerWithURL("localhost:8080",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set(constant.HeaderKeyContextType, contentType)
+ flusher := w.(http.Flusher)
+
+ // Generate appropriate test data based on content type
+ var data []byte
+ for i := 1; i <= 5; i++ {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ time.Sleep(10 * time.Millisecond)
+
+ switch contentType {
+ case constant.HeaderValueApplicationJson:
+ data = []byte(fmt.Sprintf(`{"id": %d,
"message": "test chunk %d"}\n`, i, i))
+ case constant.HeaderValueApplicationNDJson:
+ data = []byte(fmt.Sprintf(`{"id": %d,
"message": "test chunk %d"}\n`, i, i))
+ case constant.HeaderValueApplicationOctetStream:
+ data = []byte(fmt.Sprintf("CHUNK-%d",
i))
+ default: // text/plain
+ data = []byte(fmt.Sprintf("Chunk %d\n",
i))
+ }
+
+ _, _ = w.Write(data)
+ flusher.Flush()
+ logger.Info("Upstream sent chunk ", i)
+ }
+ }
+ }))
+ defer upstreamServer.Close()
+
+ req := httptest.NewRequest("GET", "http://localhost:8080/api/stream",
nil).WithContext(ctx)
+ done := make(chan struct{})
+
+ httpCtx := &contexthttp.HttpContext{
+ Request: req,
+ Writer: NewStreamHTTPRecorder(),
+ Ctx: ctx,
+ }
+
+ go func() {
+ defer close(done)
+
+ hcm := CreateHttpConnectionManager(&hcmc)
+
+ if err := hcm.Handle(httpCtx); err != nil {
+ t.Errorf("Handle failed: %v", err)
+ }
+
+ // verify that targetResp exists
+ if httpCtx.TargetResp == nil {
+ t.Error("TargetResp is nil")
+ return
+ }
+ }()
+
+ // collect and validate responses
+ receivedChunks := 0
+ for {
+ receivedEvents :=
httpCtx.Writer.(*StreamHTTPRecorder).receivedBuf
+ select {
+ case event := <-eventCh:
+ logger.Info("Received chunk: %s", event)
+ receivedChunks++
+ case <-done:
+ assert.Equal(t, 5, len(receivedEvents), "Should receive
5 chunks")
+ return
+ case <-time.After(5 * time.Second):
+ t.Fatal("Test timeout")
+ return
+ }
+ }
+}
+
+// TestIsStreamableResponse Test whether it is a function that can be streamed
and responded
+func TestIsStreamableResponse(t *testing.T) {
+ tests := []struct {
+ name string
+ headers map[string]string
+ expected bool
+ }{
+ {
+ name: "sseResponse",
+ headers: map[string]string{
+ constant.HeaderKeyContextType:
constant.HeaderValueTextEventStream,
+ },
+ expected: true,
+ },
+ {
+ name: "chunkedEncodingResponses",
+ headers: map[string]string{
+ constant.HeaderKeyContextType:
constant.HeaderValueApplicationJson,
+ constant.HeaderKeyTransferEncoding:
constant.HeaderValueChunked,
+ },
+ expected: true,
+ },
+ {
+ name: "JsonResponseWithoutContent-Length",
+ headers: map[string]string{
+ constant.HeaderKeyContextType:
constant.HeaderValueApplicationJson,
+ },
+ expected: true,
+ },
+ {
+ name: "The text response is large Content-Length",
+ headers: map[string]string{
+ constant.HeaderKeyContextType:
constant.HeaderValueTextPlain,
+ constant.HeaderKeyContentLength: "2097152", //
2MB
+ },
+ expected: true,
+ },
+ {
+ name: "JSON response Content-Length",
+ headers: map[string]string{
+ constant.HeaderKeyContextType:
constant.HeaderValueApplicationJson,
+ constant.HeaderKeyContentLength: "1024", // 1KB
+ },
+ expected: false,
+ },
+ {
+ name: "no stream Content-Type",
+ headers: map[string]string{
+ constant.HeaderKeyContextType:
constant.HeaderValueImageJpeg,
+ },
+ expected: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ resp := &http.Response{
+ Header: make(http.Header),
+ }
+ for k, v := range tt.headers {
+ resp.Header.Set(k, v)
+ }
+ result := clienthttp.IsStreamableResponse(resp)
+ assert.Equal(t, tt.expected, result,
"IsStreamableResponse() return err")
+ })
+ }
+}
diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go
index 9674f1a0..68f47449 100644
--- a/pkg/context/http/context.go
+++ b/pkg/context/http/context.go
@@ -59,7 +59,7 @@ type HttpContext struct {
// localReplyBody: happen error
localReplyBody []byte
// the response context will return.
- TargetResp client.Response
+ TargetResp any
// client call response.
SourceResp any
diff --git a/pkg/filter/accesslog/access_log_test.go
b/pkg/filter/accesslog/access_log_test.go
index 1fa4164e..e8c4d5c9 100644
--- a/pkg/filter/accesslog/access_log_test.go
+++ b/pkg/filter/accesslog/access_log_test.go
@@ -64,7 +64,7 @@ func TestApply(t *testing.T) {
request, _ := http.NewRequest("POST",
"http://www.dubbogopixiu.com/mock/test?name=tc",
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
ctx := mock.GetMockHTTPContext(request)
- ctx.TargetResp = client.NewByteResponse([]byte(msg))
+ ctx.TargetResp = client.NewUnaryResponse([]byte(msg))
f.Decode(ctx)
f.Encode(ctx)
diff --git a/pkg/filter/metric/metric.go b/pkg/filter/metric/metric.go
index 6aaf600b..72fdf484 100644
--- a/pkg/filter/metric/metric.go
+++ b/pkg/filter/metric/metric.go
@@ -25,9 +25,7 @@ import (
import (
"github.com/pkg/errors"
-
"go.opentelemetry.io/otel/attribute"
-
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
@@ -139,7 +137,7 @@ func (f *Filter) Encode(c *http.HttpContext)
filter.FilterStatus {
return filter.Continue
}
-func computeApproximateResponseSize(res client.Response) (int, error) {
+func computeApproximateResponseSize(res any) (int, error) {
if res == nil {
return 0, errors.New("client.UnaryResponse is null pointer ")
}
diff --git a/pkg/filter/prometheus/metric_test.go
b/pkg/filter/prometheus/metric_test.go
index ab7eb65f..b563a31f 100644
--- a/pkg/filter/prometheus/metric_test.go
+++ b/pkg/filter/prometheus/metric_test.go
@@ -62,7 +62,7 @@ func TestCounterExporterApiMetric(t *testing.T) {
body, _ := json.Marshal(&data)
request, _ := http.NewRequest("POST", "/_api/health",
bytes.NewBuffer(body))
ctx := mock.GetMockHTTPContext(request)
- ctx.TargetResp = client.NewByteResponse([]byte(msg))
+ ctx.TargetResp = client.NewUnaryResponse([]byte(msg))
err := factory.PrepareFilterChain(ctx, chain)
assert.Nil(t, err)
chain.OnDecode(ctx)
diff --git a/pkg/listener/http/http_listener.go
b/pkg/listener/http/http_listener.go
index d18c4157..b34cc684 100644
--- a/pkg/listener/http/http_listener.go
+++ b/pkg/listener/http/http_listener.go
@@ -20,7 +20,6 @@ package http
import (
"context"
"fmt"
- "log"
"net/http"
"strconv"
"sync"
@@ -110,7 +109,7 @@ func (ls *HttpListenerService) httpsListener() {
hl := createDefaultHttpWorker(ls)
// user customize http config
- hc := model.MapInStruct(ls.Config)
+ hc := model.MapInStruct(ls.Config.Config)
mux := http.NewServeMux()
mux.HandleFunc("/", hl.ServeHTTP)
@@ -139,7 +138,7 @@ func (ls *HttpListenerService) httpListener() {
hl := createDefaultHttpWorker(ls)
// user customize http config
- hc := model.MapInStruct(ls.Config)
+ hc := model.MapInStruct(ls.Config.Config)
mux := http.NewServeMux()
mux.HandleFunc("/", hl.ServeHTTP)
@@ -154,9 +153,16 @@ func (ls *HttpListenerService) httpListener() {
MaxHeaderBytes: resolveInt2IntProp(hc.MaxHeaderBytes, 1<<20),
}
- logger.Infof("[dubbo-go-server] httpListener start at : %s",
ls.srv.Addr)
+ logger.Infof("[dubbo-go-server] httpListener starting at %s with
WriteTimeout: %v, IdleTimeout: %v, ReadTimeout: %v",
+ ls.srv.Addr, ls.srv.WriteTimeout, ls.srv.IdleTimeout,
ls.srv.ReadTimeout)
- log.Println(ls.srv.ListenAndServe())
+ err := ls.srv.ListenAndServe()
+ // Improved error logging and replace log.Println
+ if err != nil && !errors.Is(err, http.ErrServerClosed) {
+ logger.Errorf("[dubbo-go-server] httpListener ListenAndServe
error: %v", err)
+ } else {
+ logger.Info("[dubbo-go-server] httpListener stopped
gracefully.")
+ }
}
// createDefaultHttpWorker create http listener
@@ -184,6 +190,7 @@ func resolveStr2Time(currentV string, defaultV
time.Duration) time.Duration {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
+ logger.Errorf("Parse duration failed, err: %v", err)
return 20 * time.Second
} else {
return duration
diff --git a/pkg/logger/controller.go b/pkg/logger/controller.go
index 8914cf5d..fb2d9708 100644
--- a/pkg/logger/controller.go
+++ b/pkg/logger/controller.go
@@ -31,7 +31,7 @@ import (
type logController struct {
mu sync.RWMutex
- logger *logger
+ logger *pixiuLogger
}
// setLoggerLevel safely changes the log level in a concurrent manner.
@@ -45,12 +45,12 @@ func (c *logController) setLoggerLevel(level string) bool {
c.logger.config.Level = *lvl
l, _ := c.logger.config.Build(zap.AddCallerSkip(2))
- c.logger = &logger{SugaredLogger: l.Sugar(), config: c.logger.config}
+ c.logger = &pixiuLogger{SugaredLogger: l.Sugar(), config:
c.logger.config}
return true
}
// updateLogger safely modifies the log object in a concurrent manner.
-func (c *logController) updateLogger(l *logger) {
+func (c *logController) updateLogger(l *pixiuLogger) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger = l
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 9fd73af1..f2db95e8 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -21,6 +21,7 @@ import (
"fmt"
"os"
"path"
+ "strings"
)
import (
@@ -35,7 +36,7 @@ import (
var control *logController
-type logger struct {
+type pixiuLogger struct {
*zap.SugaredLogger
config *zap.Config
}
@@ -48,6 +49,21 @@ func init() {
}
}
+// PaddedCallerEncoder is a custom caller encoder that ensures that all file
paths are displayed at the same length
+func PaddedCallerEncoder(caller zapcore.EntryCaller, enc
zapcore.PrimitiveArrayEncoder) {
+
+ callerPath := caller.TrimmedPath()
+
+ // Set a fixed length, and if the path is too short, add a space after
it
+ const fixedLength = 30
+ if len(callerPath) < fixedLength {
+ padding := strings.Repeat(" ", fixedLength-len(callerPath))
+ callerPath = callerPath + padding
+ }
+
+ enc.AppendString(callerPath)
+}
+
// InitLog load from config path
func InitLog(logConfFile string) error {
if logConfFile == "" {
@@ -69,7 +85,7 @@ func InitLog(logConfFile string) error {
err = yaml.UnmarshalYML(confFileStream, conf)
if err != nil {
InitLogger(nil)
- return perrors.New(fmt.Sprintf("[Unmarshal]init logger error:
%v", err))
+ return perrors.New(fmt.Sprintf("[Unmarshal]init pixiuLogger
error: %v", err))
}
InitLogger(conf)
@@ -84,21 +100,24 @@ func InitLogger(conf *zap.Config) {
zapLoggerEncoderConfig := zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
- NameKey: "logger",
+ NameKey: "pixiuLogger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
EncodeLevel: zapcore.CapitalColorLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
- EncodeCaller: zapcore.ShortCallerEncoder,
+ EncodeCaller: PaddedCallerEncoder,
+ // EncodeCaller: zapcore.ShortCallerEncoder,
}
zapLoggerConfig.EncoderConfig = zapLoggerEncoderConfig
} else {
zapLoggerConfig = *conf
+ // Set up a custom encoder directly without checking the
original value
+ zapLoggerConfig.EncoderConfig.EncodeCaller = PaddedCallerEncoder
}
zapLogger, _ := zapLoggerConfig.Build(zap.AddCallerSkip(2))
- l := &logger{zapLogger.Sugar(), &zapLoggerConfig}
+ l := &pixiuLogger{zapLogger.Sugar(), &zapLoggerConfig}
control.updateLogger(l)
}
diff --git a/pkg/model/http.go b/pkg/model/http.go
index e9bbfc47..e9f26716 100644
--- a/pkg/model/http.go
+++ b/pkg/model/http.go
@@ -114,11 +114,11 @@ type HttpConfig struct {
}
func MapInStruct(cfg any) *HttpConfig {
- var hc *HttpConfig
+ var hc HttpConfig
if cfg != nil {
- if ok := mapstructure.Decode(cfg, &hc); ok != nil {
- logger.Error("Config error", ok)
+ if err := mapstructure.Decode(cfg, &hc); err != nil {
+ logger.Error("Config error", err)
}
}
- return hc
+ return &hc
}
diff --git a/pkg/prometheus/prometheus.go b/pkg/prometheus/prometheus.go
index de00261b..e7171b97 100644
--- a/pkg/prometheus/prometheus.go
+++ b/pkg/prometheus/prometheus.go
@@ -29,9 +29,7 @@ import (
import (
"github.com/dubbo-go-pixiu/pixiu-api/pkg/context"
-
"github.com/prometheus/client_golang/prometheus"
-
"github.com/prometheus/common/expfmt"
)