This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch missing-details
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/missing-details by this push:
     new 0831e15  fix the unsafe byte array
0831e15 is described below

commit 0831e15521923024305533a90ac9719725ed0004
Author: mrproliu <[email protected]>
AuthorDate: Fri Dec 13 14:33:50 2024 +0800

    fix the unsafe byte array
---
 pkg/accesslog/collector/protocols/http1.go         |  9 +++---
 .../analyze/layer7/protocols/http1/analyzer.go     | 12 ++++----
 .../layer7/protocols/http1/reader/reader.go        | 32 ++++++++++++++--------
 .../layer7/protocols/http1/reader/request.go       |  9 ++++--
 .../layer7/protocols/http1/reader/response.go      |  9 ++++--
 5 files changed, 47 insertions(+), 24 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 70ea60d..c07902e 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -41,10 +41,11 @@ type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, 
request *reader.Request, r
 type HTTP1Protocol struct {
        ctx     *common.AccessLogContext
        analyze HTTP1ProtocolAnalyze
+       reader  *reader.Reader
 }
 
 func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze 
HTTP1ProtocolAnalyze) *HTTP1Protocol {
-       protocol := &HTTP1Protocol{ctx: ctx}
+       protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
        if analyze == nil {
                protocol.analyze = protocol.HandleHTTPData
        } else {
@@ -90,7 +91,7 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
 
                headerTmp := make([]byte, 16)
                buf.Peek(headerTmp)
-               messageType, err := reader.IdentityMessageType(buf)
+               messageType, err := p.reader.IdentityMessageType(buf)
                log.Infof("ready to reading message type, messageType: %v, buf: 
%p, data id: %d, "+
                        "connection ID: %d, random ID: %d, error: %v, header: 
%s", messageType, buf, buf.Position().DataID(),
                        metrics.ConnectionID, metrics.RandomID, err, 
string(headerTmp))
@@ -138,7 +139,7 @@ func (p *HTTP1Protocol) ForProtocol() 
enums.ConnectionProtocol {
 }
 
 func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf 
*buffer.Buffer) (enums.ParseResult, error) {
-       req, result, err := reader.ReadRequest(buf, true)
+       req, result, err := p.reader.ReadRequest(buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
        }
@@ -157,7 +158,7 @@ func (p *HTTP1Protocol) handleResponse(metrics 
*HTTP1Metrics, b *buffer.Buffer)
        request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
 
        // parsing response
-       response, result, err := reader.ReadResponse(request, b, true)
+       response, result, err := p.reader.ReadResponse(request, b, true)
        defer func() {
                // if parsing response failed, then put the request back to the 
list
                if result != enums.ParseResultSuccess {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 245e6dd..6cec0ef 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -51,7 +51,8 @@ var DurationHistogramBuckets = []float64{
 
 type Analyzer struct {
        // cache connection metrics if the connect event not receive or process
-       cache map[string]*ConnectionMetrics
+       cache  map[string]*ConnectionMetrics
+       reader *reader.Reader
 
        sampleConfig *SamplingConfig
 }
@@ -67,7 +68,8 @@ type ConnectionMetrics struct {
 
 func NewHTTP1Analyzer() protocol.Protocol {
        return &Analyzer{
-               cache: make(map[string]*ConnectionMetrics),
+               cache:  make(map[string]*ConnectionMetrics),
+               reader: reader.NewReader(),
        }
 }
 
@@ -90,7 +92,7 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
 
 func (h *Analyzer) ParseProtocol(connectionID uint64, metrics 
protocol.Metrics, buf *buffer.Buffer) enums.ParseResult {
        connectionMetrics := metrics.(*ConnectionMetrics)
-       messageType, err := reader.IdentityMessageType(buf)
+       messageType, err := h.reader.IdentityMessageType(buf)
        if err != nil {
                return enums.ParseResultSkipPackage
        }
@@ -116,7 +118,7 @@ func (h *Analyzer) ParseProtocol(connectionID uint64, 
metrics protocol.Metrics,
 
 func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf 
*buffer.Buffer) (enums.ParseResult, error) {
        // parsing request
-       req, r, err := reader.ReadRequest(buf, true)
+       req, r, err := h.reader.ReadRequest(buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
        }
@@ -137,7 +139,7 @@ func (h *Analyzer) handleResponse(connectionID uint64, 
metrics *ConnectionMetric
        request := metrics.halfData.Remove(firstElement).(*reader.Request)
 
        // parsing request
-       response, r, err := reader.ReadResponse(request, buf, true)
+       response, r, err := h.reader.ReadResponse(request, buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
        } else if r != enums.ParseResultSuccess {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 8d299fd..8cc8c85 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -35,9 +35,6 @@ import (
 )
 
 var (
-       headBuffer = make([]byte, 16)
-       bodyBuffer = make([]byte, 4096)
-
        requestMethods = []string{
                "GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", 
"TRACE", "PATCH",
        }
@@ -53,15 +50,27 @@ const (
        MessageTypeUnknown
 )
 
-func IdentityMessageType(reader *buffer.Buffer) (MessageType, error) {
-       n, err := reader.Peek(headBuffer)
+type Reader struct {
+       headBuffer []byte
+       bodyBuffer []byte
+}
+
+func NewReader() *Reader {
+       return &Reader{
+               headBuffer: make([]byte, 16),
+               bodyBuffer: make([]byte, 4096),
+       }
+}
+
+func (r *Reader) IdentityMessageType(reader *buffer.Buffer) (MessageType, 
error) {
+       n, err := reader.Peek(r.headBuffer)
        if err != nil {
                return MessageTypeUnknown, err
-       } else if n != len(headBuffer) {
+       } else if n != len(r.headBuffer) {
                return MessageTypeUnknown, fmt.Errorf("need more content for 
header")
        }
 
-       headerString := string(headBuffer)
+       headerString := string(r.headBuffer)
        isRequest := false
        for _, method := range requestMethods {
                if strings.HasPrefix(headerString, method) {
@@ -83,6 +92,7 @@ type Message interface {
        Headers() http.Header
        HeaderBuffer() *buffer.Buffer
        BodyBuffer() *buffer.Buffer
+       Reader() *Reader
 }
 
 type MessageOpt struct {
@@ -192,7 +202,7 @@ func (m *MessageOpt) isChunked() bool {
 func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *buffer.Buffer, 
reader *bufio.Reader) (*buffer.Buffer, enums.ParseResult, error) {
        startPosition := buf.OffsetPosition(-reader.Buffered())
        for !buf.IsCurrentPacketReadFinished() {
-               _, err := buf.Read(bodyBuffer)
+               _, err := buf.Read(m.Reader().bodyBuffer)
                if err != nil {
                        return nil, enums.ParseResultSkipPackage, err
                }
@@ -248,10 +258,10 @@ func (m *MessageOpt) checkBodyWithSize(buf 
*buffer.Buffer, reader *bufio.Reader,
        startPosition := buf.OffsetPosition(-reader.Buffered())
        for reduceSize > 0 {
                readSize = reduceSize
-               if readSize > len(bodyBuffer) {
-                       readSize = len(bodyBuffer)
+               if readSize > len(m.Reader().bodyBuffer) {
+                       readSize = len(m.Reader().bodyBuffer)
                }
-               lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+               lastReadSize, err = 
reader.Read(m.Reader().bodyBuffer[0:readSize])
                if err != nil {
                        if err == buffer.ErrNotComplete {
                                return nil, enums.ParseResultSkipPackage, nil
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 8791611..7224727 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -34,6 +34,7 @@ type Request struct {
        original     *http.Request
        headerBuffer *buffer.Buffer
        bodyBuffer   *buffer.Buffer
+       reader       *Reader
 }
 
 func (r *Request) Headers() http.Header {
@@ -48,6 +49,10 @@ func (r *Request) BodyBuffer() *buffer.Buffer {
        return r.bodyBuffer
 }
 
+func (r *Request) Reader() *Reader {
+       return r.reader
+}
+
 func (r *Request) MinDataID() int {
        return int(r.headerBuffer.FirstSocketBuffer().DataID())
 }
@@ -57,11 +62,11 @@ func (r *Request) Original() *http.Request {
 }
 
 // nolint
-func ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, 
enums.ParseResult, error) {
+func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, 
enums.ParseResult, error) {
        bufReader := bufio.NewReader(buf)
        tp := textproto.NewReader(bufReader)
        req := &http.Request{}
-       result := &Request{original: req}
+       result := &Request{original: req, reader: r}
        result.MessageOpt = &MessageOpt{result}
 
        headerStartPosition := buf.Position()
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index 966edb8..fae907b 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -35,6 +35,7 @@ type Response struct {
        original     *http.Response
        headerBuffer *buffer.Buffer
        bodyBuffer   *buffer.Buffer
+       reader       *Reader
 }
 
 func (r *Response) Headers() http.Header {
@@ -49,15 +50,19 @@ func (r *Response) BodyBuffer() *buffer.Buffer {
        return r.bodyBuffer
 }
 
+func (r *Response) Reader() *Reader {
+       return r.reader
+}
+
 func (r *Response) Original() *http.Response {
        return r.original
 }
 
-func ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response, 
enums.ParseResult, error) {
+func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) 
(*Response, enums.ParseResult, error) {
        bufReader := bufio.NewReader(buf)
        tp := textproto.NewReader(bufReader)
        resp := &http.Response{}
-       result := &Response{original: resp, req: req}
+       result := &Response{original: resp, req: req, reader: r}
        result.MessageOpt = &MessageOpt{result}
 
        headerStartPosition := buf.Position()

Reply via email to