This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 566cd766 Add streamable http (#674)
566cd766 is described below

commit 566cd7667bd89ff82610b6e72b9d9dbbab30aff6
Author: ZeruiYang <[email protected]>
AuthorDate: Fri May 30 23:33:35 2025 -0700

    Add streamable http (#674)
    
    * impl: Compatible with http/sse protocol
    
    * impl: retry
    
    * delete redundant files
    
    * fix assert
    
    * fix assert
    
    * fix import
    
    * feat: add support for streamable HTTP responses and enhance response 
handling
    
    * feat: enhance streamable response detection and add related tests
    
    * feat: refactor response handling to use UnaryResponse and clean up 
StreamResponse struct
    
    * feat: add StreamHTTPRecorder and tests for streamable HTTP responses
    
    * feat: improve HTTP listener error handling
    
    * fix: update timeout values in configuration to string format
    
    * feat: implement custom caller encoder for padded file paths in logger
    
    * fix: initialize logController in init function for default config
    
    * refactor: replace hardcoded content type values with constants in manager 
tests
    
    * refactor: update response handling to use new response constructors
    
    * refactor: standardize header constant formatting in http.go
    
    * feat: add Flush method to StreamRecorder for improved stream handling
    
    * feat: add image/jpeg header constant and update test case
    
    * refactor: rename logger to pixiuLogger and update related methods
    
    * fix: delete unuse interface
    
    ---------
    
    Co-authored-by: Alanxtl <[email protected]>
---
 configs/conf.yaml                       |   6 +-
 pkg/client/http/http.go                 |  50 ++++++-
 pkg/client/response.go                  |  23 ++--
 pkg/common/constant/http.go             |  21 ++-
 pkg/common/http/manager.go              |  14 +-
 pkg/common/http/manager_test.go         | 228 +++++++++++++++++++++++++++++++-
 pkg/context/http/context.go             |   2 +-
 pkg/filter/accesslog/access_log_test.go |   2 +-
 pkg/filter/metric/metric.go             |   4 +-
 pkg/filter/prometheus/metric_test.go    |   2 +-
 pkg/listener/http/http_listener.go      |  17 ++-
 pkg/logger/controller.go                |   6 +-
 pkg/logger/logger.go                    |  29 +++-
 pkg/model/http.go                       |   8 +-
 pkg/prometheus/prometheus.go            |   2 -
 15 files changed, 355 insertions(+), 59 deletions(-)

diff --git a/configs/conf.yaml b/configs/conf.yaml
index 3f4f4a3a..5cfca4d3 100644
--- a/configs/conf.yaml
+++ b/configs/conf.yaml
@@ -50,9 +50,9 @@ static_resources:
                     allow_credentials: false
 
       config:
-        idle_timeout: 5s
-        read_timeout: 5s
-        write_timeout: 5s
+        idle_timeout: "5s"
+        read_timeout: "5s"
+        write_timeout: "5s"
   clusters:
     - name: "user"
       lb_policy: "lb"
diff --git a/pkg/client/http/http.go b/pkg/client/http/http.go
index c5e2dd12..3a8efce0 100644
--- a/pkg/client/http/http.go
+++ b/pkg/client/http/http.go
@@ -20,13 +20,13 @@ package http
 import (
        "net/http"
        "net/url"
+       "strconv"
        "strings"
        "sync"
 )
 
 import (
        "github.com/pkg/errors"
-
        "go.opentelemetry.io/otel"
        semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
        "go.opentelemetry.io/otel/trace"
@@ -206,3 +206,51 @@ func IsSSEStream(resp *http.Response) bool {
        contentType := resp.Header.Get(constant.HeaderKeyContextType)
        return strings.Contains(contentType, 
constant.HeaderValueTextEventStream)
 }
+
+// IsStreamableResponse check if the response is streamable
+// Determine whether it is a streaming response based on the following 
conditions:
+// 1. Using Transfer-Encoding: chunked
+// 2. The content type indicates that this is a streamable response (e.g. 
text/event-stream, application/json, etc.)
+func IsStreamableResponse(resp *http.Response) bool {
+       if IsSSEStream(resp) {
+               return true
+       }
+
+       // check if the block encoded transfer is used
+       transferEncoding := resp.Header.Get(constant.HeaderKeyTransferEncoding)
+       if strings.Contains(strings.ToLower(transferEncoding), 
constant.HeaderValueChunked) {
+               return true
+       }
+
+       // check the content type
+       contentType := resp.Header.Get(constant.HeaderKeyContextType)
+
+       // check if it s a streamable content type
+       streamableTypes := []string{
+               constant.HeaderValueTextPrefix,
+               constant.HeaderValueApplicationJson,
+               constant.HeaderValueApplicationNDJson,
+               constant.HeaderValueApplicationOctetStream,
+       }
+
+       for _, streamableType := range streamableTypes {
+               if strings.HasPrefix(contentType, streamableType) {
+                       // For these content types, if you don't have 
Content-Length set or if Content-Length is large,
+                       // may be a good candidate for streaming
+                       contentLength := 
resp.Header.Get(constant.HeaderKeyContentLength)
+
+                       // If Content-Length is not specified, it is possible 
that the server is not aware of the content length
+                       if contentLength == "" {
+                               return true
+                       }
+
+                       // If the Content-Length is large (> 1MB), streaming is 
also suitable
+                       length, err := strconv.ParseInt(contentLength, 10, 64)
+                       if err == nil && length > 1024*1024 {
+                               return true
+                       }
+               }
+       }
+
+       return false
+}
diff --git a/pkg/client/response.go b/pkg/client/response.go
index fac5fd95..f9b6a81e 100644
--- a/pkg/client/response.go
+++ b/pkg/client/response.go
@@ -21,34 +21,27 @@ import (
        "io"
 )
 
-type Response interface {
-       IsStream() bool
-}
-
 // UnaryResponse response from endpoint
 type UnaryResponse struct {
        Data []byte
 }
 
-func (r *UnaryResponse) IsStream() bool {
-       return false
-}
-
-// NewByteResponse create response contains a []byte
-func NewByteResponse(data []byte) *UnaryResponse {
+// NewUnaryResponse create response contains a []byte
+func NewUnaryResponse(data []byte) *UnaryResponse {
        return &UnaryResponse{Data: data}
 }
 
 // StreamResponse response from endpoint
 type StreamResponse struct {
-       Stream io.ReadCloser
+       Stream      io.ReadCloser
+       IsSSEStream bool
 }
 
-func (r *StreamResponse) IsStream() bool {
-       return true
+func (r *StreamResponse) IsSSE() bool {
+       return r.IsSSEStream
 }
 
 // NewStreamResponse create response contains a stream
-func NewStreamResponse(stream io.ReadCloser) *StreamResponse {
-       return &StreamResponse{Stream: stream}
+func NewStreamResponse(stream io.ReadCloser, isSSE bool) *StreamResponse {
+       return &StreamResponse{Stream: stream, IsSSEStream: isSSE}
 }
diff --git a/pkg/common/constant/http.go b/pkg/common/constant/http.go
index ac10c843..b76dfeb0 100644
--- a/pkg/common/constant/http.go
+++ b/pkg/common/constant/http.go
@@ -18,9 +18,11 @@
 package constant
 
 const (
-       HeaderKeyContextType  = "Content-Type"
-       HeaderKeyCacheControl = "Cache-Control"
-       HeaderKeyConnection   = "Connection"
+       HeaderKeyContextType      = "Content-Type"
+       HeaderKeyCacheControl     = "Cache-Control"
+       HeaderKeyConnection       = "Connection"
+       HeaderKeyTransferEncoding = "Transfer-Encoding"
+       HeaderKeyContentLength    = "Content-Length"
 
        HeaderKeyAccessControlAllowOrigin      = "Access-Control-Allow-Origin"
        HeaderKeyAccessControlAllowHeaders     = "Access-Control-Allow-Headers"
@@ -29,10 +31,15 @@ const (
        HeaderKeyAccessControlMaxAge           = "Access-Control-Max-Age"
        HeaderKeyAccessControlAllowCredentials = 
"Access-Control-Allow-Credentials"
 
-       HeaderValueJsonUtf8        = "application/json;charset=UTF-8"
-       HeaderValueTextPlain       = "text/plain"
-       HeaderValueTextEventStream = "text/event-stream"
-       HeaderValueApplicationJson = "application/json"
+       HeaderValueJsonUtf8               = "application/json;charset=UTF-8"
+       HeaderValueTextPlain              = "text/plain"
+       HeaderValueTextEventStream        = "text/event-stream"
+       HeaderValueApplicationJson        = "application/json"
+       HeaderValueApplicationOctetStream = "application/octet-stream"
+       HeaderValueApplicationNDJson      = "application/x-ndjson"
+       HeaderValueImageJpeg              = "image/jpeg"
+       HeaderValueChunked                = "chunked"
+       HeaderValueTextPrefix             = "text/"
 
        HeaderValueKeepAlive = "keep-alive"
        HeaderValueNoCache   = "no-cache"
diff --git a/pkg/common/http/manager.go b/pkg/common/http/manager.go
index 703e4b1c..ee9a5833 100644
--- a/pkg/common/http/manager.go
+++ b/pkg/common/http/manager.go
@@ -179,6 +179,11 @@ func (hcm *HttpConnectionManager) writeResponse(c 
*pch.HttpContext) {
                                                        _ = res.Stream.Close()
                                                        return
                                                }
+
+                                               if flusher, ok := 
c.Writer.(stdHttp.Flusher); ok {
+                                                       flusher.Flush()
+                                               }
+
                                        case err := <-errC:
                                                if err != nil && err != io.EOF {
                                                        logger.Errorf("Stream 
error: %v", err)
@@ -190,7 +195,6 @@ func (hcm *HttpConnectionManager) writeResponse(c 
*pch.HttpContext) {
                        default:
                                logger.Errorf("Unknown response type: %T", 
c.TargetResp)
                        }
-
                }
        }
 }
@@ -210,8 +214,8 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c 
*pch.HttpContext) {
                //status code
                c.StatusCode(res.StatusCode)
 
-               if http.IsSSEStream(res) {
-                       c.TargetResp = &client.StreamResponse{Stream: res.Body}
+               if http.IsStreamableResponse(res) {
+                       c.TargetResp = client.NewStreamResponse(res.Body, 
http.IsSSEStream(res))
                } else {
                        body, err := io.ReadAll(res.Body)
                        if err != nil {
@@ -219,7 +223,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c 
*pch.HttpContext) {
                        }
                        //close body
                        _ = res.Body.Close()
-                       c.TargetResp = &client.UnaryResponse{Data: body}
+                       c.TargetResp = client.NewUnaryResponse(body)
                }
        case []byte:
                c.StatusCode(stdHttp.StatusOK)
@@ -228,7 +232,7 @@ func (hcm *HttpConnectionManager) buildTargetResponse(c 
*pch.HttpContext) {
                } else {
                        c.AddHeader(constant.HeaderKeyContextType, 
constant.HeaderValueTextPlain)
                }
-               c.TargetResp = &client.UnaryResponse{Data: res}
+               c.TargetResp = client.NewUnaryResponse(res)
        default:
                //dubbo go generic invoke
                response := util.NewDubboResponse(res, false)
diff --git a/pkg/common/http/manager_test.go b/pkg/common/http/manager_test.go
index 480272ef..292fd752 100644
--- a/pkg/common/http/manager_test.go
+++ b/pkg/common/http/manager_test.go
@@ -34,6 +34,8 @@ import (
 )
 
 import (
+       clienthttp "github.com/apache/dubbo-go-pixiu/pkg/client/http"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
        "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
        commonmock "github.com/apache/dubbo-go-pixiu/pkg/common/mock"
        "github.com/apache/dubbo-go-pixiu/pkg/common/router/trie"
@@ -171,8 +173,8 @@ func TestStreamingResponse(t *testing.T) {
 
        // mock server
        upstreamServer, _ := NewTestServerWithURL("localhost:8080", 
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               w.Header().Set("Content-Type", "text/event-stream")
-               w.Header().Set("Cache-Control", "no-cache")
+               w.Header().Set(constant.HeaderKeyContextType, 
constant.HeaderValueTextEventStream)
+               w.Header().Set(constant.HeaderKeyCacheControl, 
constant.HeaderValueNoCache)
                flusher := w.(http.Flusher)
 
                for i := 1; i <= 3; i++ {
@@ -229,7 +231,6 @@ func TestStreamingResponse(t *testing.T) {
                        return
                }
        }
-
 }
 
 // mock recorder
@@ -262,6 +263,9 @@ func (r *StreamRecorder) Write(data []byte) (int, error) {
        return len(data), nil
 }
 
+func (r *StreamRecorder) Flush() {
+}
+
 func NewTestServerWithURL(URL string, handler http.Handler) (*httptest.Server, 
error) {
        ts := httptest.NewUnstartedServer(handler)
        if URL != "" {
@@ -278,3 +282,221 @@ func NewTestServerWithURL(URL string, handler 
http.Handler) (*httptest.Server, e
        ts.Start()
        return ts, nil
 }
+
+// StreamHTTPRecorder Used to capture and test streaming HTTP responses over 
channels
+type StreamHTTPRecorder struct {
+       http.ResponseWriter
+       receivedBuf []string
+       headers     http.Header
+       status      int
+       flushCount  int
+}
+
+func NewStreamHTTPRecorder() *StreamHTTPRecorder {
+       return &StreamHTTPRecorder{
+               receivedBuf: make([]string, 0),
+               headers:     make(http.Header),
+               flushCount:  0,
+       }
+}
+
+func (r *StreamHTTPRecorder) Header() http.Header {
+       return r.headers
+}
+
+func (r *StreamHTTPRecorder) WriteHeader(statusCode int) {
+       r.status = statusCode
+}
+
+func (r *StreamHTTPRecorder) Write(data []byte) (int, error) {
+       eventCh <- string(data)
+       r.receivedBuf = append(r.receivedBuf, string(data))
+       return len(data), nil
+}
+
+func (r *StreamHTTPRecorder) Flush() {
+       r.flushCount++
+}
+
+// Test a variety of common streaming HTTP response types
+func TestStreamableHTTPResponse(t *testing.T) {
+       // define the type of content you want to test
+       contentTypes := []string{
+               constant.HeaderValueTextPlain,
+               constant.HeaderValueApplicationJson,
+               constant.HeaderValueApplicationOctetStream,
+               constant.HeaderValueApplicationNDJson,
+       }
+
+       for _, contentType := range contentTypes {
+               t.Run(fmt.Sprintf("ContentType_%s", contentType), func(t 
*testing.T) {
+                       testStreamableResponse(t, contentType)
+               })
+       }
+}
+
+func testStreamableResponse(t *testing.T, contentType string) {
+       hcmc := model.HttpConnectionManagerConfig{
+               RouteConfig: model.RouteConfiguration{
+                       RouteTrie: trie.NewTrieWithDefault("GET/api/stream", 
model.RouteAction{
+                               Cluster: "mock_stream_cluster",
+                       }),
+               },
+               HTTPFilters: []*model.HTTPFilter{
+                       {
+                               Name: commonmock.Kind,
+                       },
+               },
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+       defer cancel()
+
+       // Clear any data that may have been left over from the previous test
+       for len(eventCh) > 0 {
+               <-eventCh
+       }
+
+       // mock server
+       upstreamServer, _ := NewTestServerWithURL("localhost:8080", 
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               w.Header().Set(constant.HeaderKeyContextType, contentType)
+               flusher := w.(http.Flusher)
+
+               // Generate appropriate test data based on content type
+               var data []byte
+               for i := 1; i <= 5; i++ {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       default:
+                               time.Sleep(10 * time.Millisecond)
+
+                               switch contentType {
+                               case constant.HeaderValueApplicationJson:
+                                       data = []byte(fmt.Sprintf(`{"id": %d, 
"message": "test chunk %d"}\n`, i, i))
+                               case constant.HeaderValueApplicationNDJson:
+                                       data = []byte(fmt.Sprintf(`{"id": %d, 
"message": "test chunk %d"}\n`, i, i))
+                               case constant.HeaderValueApplicationOctetStream:
+                                       data = []byte(fmt.Sprintf("CHUNK-%d", 
i))
+                               default: // text/plain
+                                       data = []byte(fmt.Sprintf("Chunk %d\n", 
i))
+                               }
+
+                               _, _ = w.Write(data)
+                               flusher.Flush()
+                               logger.Info("Upstream sent chunk ", i)
+                       }
+               }
+       }))
+       defer upstreamServer.Close()
+
+       req := httptest.NewRequest("GET", "http://localhost:8080/api/stream";, 
nil).WithContext(ctx)
+       done := make(chan struct{})
+
+       httpCtx := &contexthttp.HttpContext{
+               Request: req,
+               Writer:  NewStreamHTTPRecorder(),
+               Ctx:     ctx,
+       }
+
+       go func() {
+               defer close(done)
+
+               hcm := CreateHttpConnectionManager(&hcmc)
+
+               if err := hcm.Handle(httpCtx); err != nil {
+                       t.Errorf("Handle failed: %v", err)
+               }
+
+               // verify that targetResp exists
+               if httpCtx.TargetResp == nil {
+                       t.Error("TargetResp is nil")
+                       return
+               }
+       }()
+
+       // collect and validate responses
+       receivedChunks := 0
+       for {
+               receivedEvents := 
httpCtx.Writer.(*StreamHTTPRecorder).receivedBuf
+               select {
+               case event := <-eventCh:
+                       logger.Info("Received chunk: %s", event)
+                       receivedChunks++
+               case <-done:
+                       assert.Equal(t, 5, len(receivedEvents), "Should receive 
5 chunks")
+                       return
+               case <-time.After(5 * time.Second):
+                       t.Fatal("Test timeout")
+                       return
+               }
+       }
+}
+
+// TestIsStreamableResponse Test whether it is a function that can be streamed 
and responded
+func TestIsStreamableResponse(t *testing.T) {
+       tests := []struct {
+               name     string
+               headers  map[string]string
+               expected bool
+       }{
+               {
+                       name: "sseResponse",
+                       headers: map[string]string{
+                               constant.HeaderKeyContextType: 
constant.HeaderValueTextEventStream,
+                       },
+                       expected: true,
+               },
+               {
+                       name: "chunkedEncodingResponses",
+                       headers: map[string]string{
+                               constant.HeaderKeyContextType:      
constant.HeaderValueApplicationJson,
+                               constant.HeaderKeyTransferEncoding: 
constant.HeaderValueChunked,
+                       },
+                       expected: true,
+               },
+               {
+                       name: "JsonResponseWithoutContent-Length",
+                       headers: map[string]string{
+                               constant.HeaderKeyContextType: 
constant.HeaderValueApplicationJson,
+                       },
+                       expected: true,
+               },
+               {
+                       name: "The text response is large Content-Length",
+                       headers: map[string]string{
+                               constant.HeaderKeyContextType:   
constant.HeaderValueTextPlain,
+                               constant.HeaderKeyContentLength: "2097152", // 
2MB
+                       },
+                       expected: true,
+               },
+               {
+                       name: "JSON response Content-Length",
+                       headers: map[string]string{
+                               constant.HeaderKeyContextType:   
constant.HeaderValueApplicationJson,
+                               constant.HeaderKeyContentLength: "1024", // 1KB
+                       },
+                       expected: false,
+               },
+               {
+                       name: "no stream Content-Type",
+                       headers: map[string]string{
+                               constant.HeaderKeyContextType: 
constant.HeaderValueImageJpeg,
+                       },
+                       expected: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       resp := &http.Response{
+                               Header: make(http.Header),
+                       }
+                       for k, v := range tt.headers {
+                               resp.Header.Set(k, v)
+                       }
+                       result := clienthttp.IsStreamableResponse(resp)
+                       assert.Equal(t, tt.expected, result, 
"IsStreamableResponse() return err")
+               })
+       }
+}
diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go
index 9674f1a0..68f47449 100644
--- a/pkg/context/http/context.go
+++ b/pkg/context/http/context.go
@@ -59,7 +59,7 @@ type HttpContext struct {
        // localReplyBody: happen error
        localReplyBody []byte
        // the response context will return.
-       TargetResp client.Response
+       TargetResp any
        // client call response.
        SourceResp any
 
diff --git a/pkg/filter/accesslog/access_log_test.go 
b/pkg/filter/accesslog/access_log_test.go
index 1fa4164e..e8c4d5c9 100644
--- a/pkg/filter/accesslog/access_log_test.go
+++ b/pkg/filter/accesslog/access_log_test.go
@@ -64,7 +64,7 @@ func TestApply(t *testing.T) {
 
        request, _ := http.NewRequest("POST", 
"http://www.dubbogopixiu.com/mock/test?name=tc";, 
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
        ctx := mock.GetMockHTTPContext(request)
-       ctx.TargetResp = client.NewByteResponse([]byte(msg))
+       ctx.TargetResp = client.NewUnaryResponse([]byte(msg))
 
        f.Decode(ctx)
        f.Encode(ctx)
diff --git a/pkg/filter/metric/metric.go b/pkg/filter/metric/metric.go
index 6aaf600b..72fdf484 100644
--- a/pkg/filter/metric/metric.go
+++ b/pkg/filter/metric/metric.go
@@ -25,9 +25,7 @@ import (
 
 import (
        "github.com/pkg/errors"
-
        "go.opentelemetry.io/otel/attribute"
-
        "go.opentelemetry.io/otel/metric/global"
        "go.opentelemetry.io/otel/metric/instrument"
        "go.opentelemetry.io/otel/metric/instrument/syncint64"
@@ -139,7 +137,7 @@ func (f *Filter) Encode(c *http.HttpContext) 
filter.FilterStatus {
        return filter.Continue
 }
 
-func computeApproximateResponseSize(res client.Response) (int, error) {
+func computeApproximateResponseSize(res any) (int, error) {
        if res == nil {
                return 0, errors.New("client.UnaryResponse is null pointer ")
        }
diff --git a/pkg/filter/prometheus/metric_test.go 
b/pkg/filter/prometheus/metric_test.go
index ab7eb65f..b563a31f 100644
--- a/pkg/filter/prometheus/metric_test.go
+++ b/pkg/filter/prometheus/metric_test.go
@@ -62,7 +62,7 @@ func TestCounterExporterApiMetric(t *testing.T) {
                        body, _ := json.Marshal(&data)
                        request, _ := http.NewRequest("POST", "/_api/health", 
bytes.NewBuffer(body))
                        ctx := mock.GetMockHTTPContext(request)
-                       ctx.TargetResp = client.NewByteResponse([]byte(msg))
+                       ctx.TargetResp = client.NewUnaryResponse([]byte(msg))
                        err := factory.PrepareFilterChain(ctx, chain)
                        assert.Nil(t, err)
                        chain.OnDecode(ctx)
diff --git a/pkg/listener/http/http_listener.go 
b/pkg/listener/http/http_listener.go
index d18c4157..b34cc684 100644
--- a/pkg/listener/http/http_listener.go
+++ b/pkg/listener/http/http_listener.go
@@ -20,7 +20,6 @@ package http
 import (
        "context"
        "fmt"
-       "log"
        "net/http"
        "strconv"
        "sync"
@@ -110,7 +109,7 @@ func (ls *HttpListenerService) httpsListener() {
        hl := createDefaultHttpWorker(ls)
 
        // user customize http config
-       hc := model.MapInStruct(ls.Config)
+       hc := model.MapInStruct(ls.Config.Config)
 
        mux := http.NewServeMux()
        mux.HandleFunc("/", hl.ServeHTTP)
@@ -139,7 +138,7 @@ func (ls *HttpListenerService) httpListener() {
        hl := createDefaultHttpWorker(ls)
 
        // user customize http config
-       hc := model.MapInStruct(ls.Config)
+       hc := model.MapInStruct(ls.Config.Config)
 
        mux := http.NewServeMux()
        mux.HandleFunc("/", hl.ServeHTTP)
@@ -154,9 +153,16 @@ func (ls *HttpListenerService) httpListener() {
                MaxHeaderBytes: resolveInt2IntProp(hc.MaxHeaderBytes, 1<<20),
        }
 
-       logger.Infof("[dubbo-go-server] httpListener start at : %s", 
ls.srv.Addr)
+       logger.Infof("[dubbo-go-server] httpListener starting at %s with 
WriteTimeout: %v, IdleTimeout: %v, ReadTimeout: %v",
+               ls.srv.Addr, ls.srv.WriteTimeout, ls.srv.IdleTimeout, 
ls.srv.ReadTimeout)
 
-       log.Println(ls.srv.ListenAndServe())
+       err := ls.srv.ListenAndServe()
+       // Improved error logging and replace log.Println
+       if err != nil && !errors.Is(err, http.ErrServerClosed) {
+               logger.Errorf("[dubbo-go-server] httpListener ListenAndServe 
error: %v", err)
+       } else {
+               logger.Info("[dubbo-go-server] httpListener stopped 
gracefully.")
+       }
 }
 
 // createDefaultHttpWorker create http listener
@@ -184,6 +190,7 @@ func resolveStr2Time(currentV string, defaultV 
time.Duration) time.Duration {
                return defaultV
        } else {
                if duration, err := time.ParseDuration(currentV); err != nil {
+                       logger.Errorf("Parse duration failed, err: %v", err)
                        return 20 * time.Second
                } else {
                        return duration
diff --git a/pkg/logger/controller.go b/pkg/logger/controller.go
index 8914cf5d..fb2d9708 100644
--- a/pkg/logger/controller.go
+++ b/pkg/logger/controller.go
@@ -31,7 +31,7 @@ import (
 type logController struct {
        mu sync.RWMutex
 
-       logger *logger
+       logger *pixiuLogger
 }
 
 // setLoggerLevel safely changes the log level in a concurrent manner.
@@ -45,12 +45,12 @@ func (c *logController) setLoggerLevel(level string) bool {
 
        c.logger.config.Level = *lvl
        l, _ := c.logger.config.Build(zap.AddCallerSkip(2))
-       c.logger = &logger{SugaredLogger: l.Sugar(), config: c.logger.config}
+       c.logger = &pixiuLogger{SugaredLogger: l.Sugar(), config: 
c.logger.config}
        return true
 }
 
 // updateLogger safely modifies the log object in a concurrent manner.
-func (c *logController) updateLogger(l *logger) {
+func (c *logController) updateLogger(l *pixiuLogger) {
        c.mu.Lock()
        defer c.mu.Unlock()
        c.logger = l
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 9fd73af1..f2db95e8 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -21,6 +21,7 @@ import (
        "fmt"
        "os"
        "path"
+       "strings"
 )
 
 import (
@@ -35,7 +36,7 @@ import (
 
 var control *logController
 
-type logger struct {
+type pixiuLogger struct {
        *zap.SugaredLogger
        config *zap.Config
 }
@@ -48,6 +49,21 @@ func init() {
        }
 }
 
+// PaddedCallerEncoder is a custom caller encoder that ensures that all file 
paths are displayed at the same length
+func PaddedCallerEncoder(caller zapcore.EntryCaller, enc 
zapcore.PrimitiveArrayEncoder) {
+
+       callerPath := caller.TrimmedPath()
+
+       // Set a fixed length, and if the path is too short, add a space after 
it
+       const fixedLength = 30
+       if len(callerPath) < fixedLength {
+               padding := strings.Repeat(" ", fixedLength-len(callerPath))
+               callerPath = callerPath + padding
+       }
+
+       enc.AppendString(callerPath)
+}
+
 // InitLog load from config path
 func InitLog(logConfFile string) error {
        if logConfFile == "" {
@@ -69,7 +85,7 @@ func InitLog(logConfFile string) error {
        err = yaml.UnmarshalYML(confFileStream, conf)
        if err != nil {
                InitLogger(nil)
-               return perrors.New(fmt.Sprintf("[Unmarshal]init logger error: 
%v", err))
+               return perrors.New(fmt.Sprintf("[Unmarshal]init pixiuLogger 
error: %v", err))
        }
 
        InitLogger(conf)
@@ -84,21 +100,24 @@ func InitLogger(conf *zap.Config) {
                zapLoggerEncoderConfig := zapcore.EncoderConfig{
                        TimeKey:        "time",
                        LevelKey:       "level",
-                       NameKey:        "logger",
+                       NameKey:        "pixiuLogger",
                        CallerKey:      "caller",
                        MessageKey:     "message",
                        StacktraceKey:  "stacktrace",
                        EncodeLevel:    zapcore.CapitalColorLevelEncoder,
                        EncodeTime:     zapcore.ISO8601TimeEncoder,
                        EncodeDuration: zapcore.SecondsDurationEncoder,
-                       EncodeCaller:   zapcore.ShortCallerEncoder,
+                       EncodeCaller:   PaddedCallerEncoder,
+                       // EncodeCaller:   zapcore.ShortCallerEncoder,
                }
                zapLoggerConfig.EncoderConfig = zapLoggerEncoderConfig
        } else {
                zapLoggerConfig = *conf
+               // Set up a custom encoder directly without checking the 
original value
+               zapLoggerConfig.EncoderConfig.EncodeCaller = PaddedCallerEncoder
        }
        zapLogger, _ := zapLoggerConfig.Build(zap.AddCallerSkip(2))
-       l := &logger{zapLogger.Sugar(), &zapLoggerConfig}
+       l := &pixiuLogger{zapLogger.Sugar(), &zapLoggerConfig}
 
        control.updateLogger(l)
 }
diff --git a/pkg/model/http.go b/pkg/model/http.go
index e9bbfc47..e9f26716 100644
--- a/pkg/model/http.go
+++ b/pkg/model/http.go
@@ -114,11 +114,11 @@ type HttpConfig struct {
 }
 
 func MapInStruct(cfg any) *HttpConfig {
-       var hc *HttpConfig
+       var hc HttpConfig
        if cfg != nil {
-               if ok := mapstructure.Decode(cfg, &hc); ok != nil {
-                       logger.Error("Config error", ok)
+               if err := mapstructure.Decode(cfg, &hc); err != nil {
+                       logger.Error("Config error", err)
                }
        }
-       return hc
+       return &hc
 }
diff --git a/pkg/prometheus/prometheus.go b/pkg/prometheus/prometheus.go
index de00261b..e7171b97 100644
--- a/pkg/prometheus/prometheus.go
+++ b/pkg/prometheus/prometheus.go
@@ -29,9 +29,7 @@ import (
 
 import (
        "github.com/dubbo-go-pixiu/pixiu-api/pkg/context"
-
        "github.com/prometheus/client_golang/prometheus"
-
        "github.com/prometheus/common/expfmt"
 )
 

Reply via email to