This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch missing-details
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/missing-details by this push:
new 0831e15 fix the unsafe byte array
0831e15 is described below
commit 0831e15521923024305533a90ac9719725ed0004
Author: mrproliu <[email protected]>
AuthorDate: Fri Dec 13 14:33:50 2024 +0800
fix the unsafe byte array
---
pkg/accesslog/collector/protocols/http1.go | 9 +++---
.../analyze/layer7/protocols/http1/analyzer.go | 12 ++++----
.../layer7/protocols/http1/reader/reader.go | 32 ++++++++++++++--------
.../layer7/protocols/http1/reader/request.go | 9 ++++--
.../layer7/protocols/http1/reader/response.go | 9 ++++--
5 files changed, 47 insertions(+), 24 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 70ea60d..c07902e 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -41,10 +41,11 @@ type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics,
request *reader.Request, r
type HTTP1Protocol struct {
ctx *common.AccessLogContext
analyze HTTP1ProtocolAnalyze
+ reader *reader.Reader
}
func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze
HTTP1ProtocolAnalyze) *HTTP1Protocol {
- protocol := &HTTP1Protocol{ctx: ctx}
+ protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
if analyze == nil {
protocol.analyze = protocol.HandleHTTPData
} else {
@@ -90,7 +91,7 @@ func (p *HTTP1Protocol) Analyze(connection
*PartitionConnection, _ *AnalyzeHelpe
headerTmp := make([]byte, 16)
buf.Peek(headerTmp)
- messageType, err := reader.IdentityMessageType(buf)
+ messageType, err := p.reader.IdentityMessageType(buf)
log.Infof("ready to reading message type, messageType: %v, buf:
%p, data id: %d, "+
"connection ID: %d, random ID: %d, error: %v, header:
%s", messageType, buf, buf.Position().DataID(),
metrics.ConnectionID, metrics.RandomID, err,
string(headerTmp))
@@ -138,7 +139,7 @@ func (p *HTTP1Protocol) ForProtocol()
enums.ConnectionProtocol {
}
func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf
*buffer.Buffer) (enums.ParseResult, error) {
- req, result, err := reader.ReadRequest(buf, true)
+ req, result, err := p.reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
}
@@ -157,7 +158,7 @@ func (p *HTTP1Protocol) handleResponse(metrics
*HTTP1Metrics, b *buffer.Buffer)
request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
// parsing response
- response, result, err := reader.ReadResponse(request, b, true)
+ response, result, err := p.reader.ReadResponse(request, b, true)
defer func() {
// if parsing response failed, then put the request back to the
list
if result != enums.ParseResultSuccess {
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 245e6dd..6cec0ef 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -51,7 +51,8 @@ var DurationHistogramBuckets = []float64{
type Analyzer struct {
// cache connection metrics if the connect event not receive or process
- cache map[string]*ConnectionMetrics
+ cache map[string]*ConnectionMetrics
+ reader *reader.Reader
sampleConfig *SamplingConfig
}
@@ -67,7 +68,8 @@ type ConnectionMetrics struct {
func NewHTTP1Analyzer() protocol.Protocol {
return &Analyzer{
- cache: make(map[string]*ConnectionMetrics),
+ cache: make(map[string]*ConnectionMetrics),
+ reader: reader.NewReader(),
}
}
@@ -90,7 +92,7 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
func (h *Analyzer) ParseProtocol(connectionID uint64, metrics
protocol.Metrics, buf *buffer.Buffer) enums.ParseResult {
connectionMetrics := metrics.(*ConnectionMetrics)
- messageType, err := reader.IdentityMessageType(buf)
+ messageType, err := h.reader.IdentityMessageType(buf)
if err != nil {
return enums.ParseResultSkipPackage
}
@@ -116,7 +118,7 @@ func (h *Analyzer) ParseProtocol(connectionID uint64,
metrics protocol.Metrics,
func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf
*buffer.Buffer) (enums.ParseResult, error) {
// parsing request
- req, r, err := reader.ReadRequest(buf, true)
+ req, r, err := h.reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
}
@@ -137,7 +139,7 @@ func (h *Analyzer) handleResponse(connectionID uint64,
metrics *ConnectionMetric
request := metrics.halfData.Remove(firstElement).(*reader.Request)
// parsing request
- response, r, err := reader.ReadResponse(request, buf, true)
+ response, r, err := h.reader.ReadResponse(request, buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
} else if r != enums.ParseResultSuccess {
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 8d299fd..8cc8c85 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -35,9 +35,6 @@ import (
)
var (
- headBuffer = make([]byte, 16)
- bodyBuffer = make([]byte, 4096)
-
requestMethods = []string{
"GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT",
"TRACE", "PATCH",
}
@@ -53,15 +50,27 @@ const (
MessageTypeUnknown
)
-func IdentityMessageType(reader *buffer.Buffer) (MessageType, error) {
- n, err := reader.Peek(headBuffer)
+type Reader struct {
+ headBuffer []byte
+ bodyBuffer []byte
+}
+
+func NewReader() *Reader {
+ return &Reader{
+ headBuffer: make([]byte, 16),
+ bodyBuffer: make([]byte, 4096),
+ }
+}
+
+func (r *Reader) IdentityMessageType(reader *buffer.Buffer) (MessageType,
error) {
+ n, err := reader.Peek(r.headBuffer)
if err != nil {
return MessageTypeUnknown, err
- } else if n != len(headBuffer) {
+ } else if n != len(r.headBuffer) {
return MessageTypeUnknown, fmt.Errorf("need more content for
header")
}
- headerString := string(headBuffer)
+ headerString := string(r.headBuffer)
isRequest := false
for _, method := range requestMethods {
if strings.HasPrefix(headerString, method) {
@@ -83,6 +92,7 @@ type Message interface {
Headers() http.Header
HeaderBuffer() *buffer.Buffer
BodyBuffer() *buffer.Buffer
+ Reader() *Reader
}
type MessageOpt struct {
@@ -192,7 +202,7 @@ func (m *MessageOpt) isChunked() bool {
func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *buffer.Buffer,
reader *bufio.Reader) (*buffer.Buffer, enums.ParseResult, error) {
startPosition := buf.OffsetPosition(-reader.Buffered())
for !buf.IsCurrentPacketReadFinished() {
- _, err := buf.Read(bodyBuffer)
+ _, err := buf.Read(m.Reader().bodyBuffer)
if err != nil {
return nil, enums.ParseResultSkipPackage, err
}
@@ -248,10 +258,10 @@ func (m *MessageOpt) checkBodyWithSize(buf
*buffer.Buffer, reader *bufio.Reader,
startPosition := buf.OffsetPosition(-reader.Buffered())
for reduceSize > 0 {
readSize = reduceSize
- if readSize > len(bodyBuffer) {
- readSize = len(bodyBuffer)
+ if readSize > len(m.Reader().bodyBuffer) {
+ readSize = len(m.Reader().bodyBuffer)
}
- lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+ lastReadSize, err =
reader.Read(m.Reader().bodyBuffer[0:readSize])
if err != nil {
if err == buffer.ErrNotComplete {
return nil, enums.ParseResultSkipPackage, nil
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 8791611..7224727 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -34,6 +34,7 @@ type Request struct {
original *http.Request
headerBuffer *buffer.Buffer
bodyBuffer *buffer.Buffer
+ reader *Reader
}
func (r *Request) Headers() http.Header {
@@ -48,6 +49,10 @@ func (r *Request) BodyBuffer() *buffer.Buffer {
return r.bodyBuffer
}
+func (r *Request) Reader() *Reader {
+ return r.reader
+}
+
func (r *Request) MinDataID() int {
return int(r.headerBuffer.FirstSocketBuffer().DataID())
}
@@ -57,11 +62,11 @@ func (r *Request) Original() *http.Request {
}
// nolint
-func ReadRequest(buf *buffer.Buffer, readBody bool) (*Request,
enums.ParseResult, error) {
+func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request,
enums.ParseResult, error) {
bufReader := bufio.NewReader(buf)
tp := textproto.NewReader(bufReader)
req := &http.Request{}
- result := &Request{original: req}
+ result := &Request{original: req, reader: r}
result.MessageOpt = &MessageOpt{result}
headerStartPosition := buf.Position()
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index 966edb8..fae907b 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -35,6 +35,7 @@ type Response struct {
original *http.Response
headerBuffer *buffer.Buffer
bodyBuffer *buffer.Buffer
+ reader *Reader
}
func (r *Response) Headers() http.Header {
@@ -49,15 +50,19 @@ func (r *Response) BodyBuffer() *buffer.Buffer {
return r.bodyBuffer
}
+func (r *Response) Reader() *Reader {
+ return r.reader
+}
+
func (r *Response) Original() *http.Response {
return r.original
}
-func ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response,
enums.ParseResult, error) {
+func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool)
(*Response, enums.ParseResult, error) {
bufReader := bufio.NewReader(buf)
tp := textproto.NewReader(bufReader)
resp := &http.Response{}
- result := &Response{original: resp, req: req}
+ result := &Response{original: resp, req: req, reader: r}
result.MessageOpt = &MessageOpt{result}
headerStartPosition := buf.Position()