This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch missing-details in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit fa318553a53c0ff34bf703aae34e799a3451ba77 Author: mrproliu <[email protected]> AuthorDate: Thu Dec 12 11:20:53 2024 +0800 Reduce missing details issue in the access log module --- CHANGES.md | 1 + pkg/accesslog/collector/connection.go | 11 +- pkg/accesslog/collector/protocols/http1.go | 118 ++++++++++---- pkg/accesslog/collector/protocols/http2.go | 53 ++++--- pkg/accesslog/collector/protocols/protocol.go | 22 ++- pkg/accesslog/collector/protocols/queue.go | 4 +- pkg/accesslog/common/config.go | 3 +- pkg/accesslog/events/detail.go | 4 + pkg/profiling/task/network/analyze/events/data.go | 4 + .../analyze/layer7/protocols/base/analyzer.go | 2 +- .../analyze/layer7/protocols/http1/analyzer.go | 12 +- .../analyze/layer7/protocols/http1/metrics.go | 4 +- .../layer7/protocols/http1/reader/reader.go | 34 ++-- .../layer7/protocols/http1/reader/request.go | 9 +- .../layer7/protocols/http1/reader/response.go | 9 +- pkg/tools/buffer/buffer.go | 171 ++++++++++++++++++--- 16 files changed, 349 insertions(+), 112 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3392d50..219e53e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ Release Notes. * Support for connecting to the backend server over TLS without requiring `ca.pem`. * Fix missing the first socket detail event in HTTPS protocol. * Support parallel parsing protocol data in the access log module. +* Reduce missing details issue in the access log module. #### Bug Fixes * Fix the base image cannot run in the arm64. diff --git a/pkg/accesslog/collector/connection.go b/pkg/accesslog/collector/connection.go index 8e8aba1..63b55b8 100644 --- a/pkg/accesslog/collector/connection.go +++ b/pkg/accesslog/collector/connection.go @@ -64,7 +64,10 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext if int(perCPUBufferSize) < os.Getpagesize() { return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize()) } - if ctx.Config.ConnectionAnalyze.Parallels < 1 { + if ctx.Config.ConnectionAnalyze.ParseParallels < 1 { + return fmt.Errorf("the parallels cannot be small than 1") + } + if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 { return fmt.Errorf("the parallels cannot be small than 1") } if ctx.Config.ConnectionAnalyze.QueueSize < 1 { @@ -74,15 +77,15 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext if err != nil { connectionLogger.Warnf("cannot create the connection tracker, %v", err) } - c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { + c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { return newConnectionPartitionContext(ctx, track) }) - c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), 1, func() interface{} { + c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { return &events.SocketConnectEvent{} }, func(data interface{}) string { return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID) }) - c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), 1, func() interface{} { + c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { return &events.SocketCloseEvent{} }, func(data interface{}) string { return fmt.Sprintf("%d", data.(*events.SocketCloseEvent).ConnectionID) diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 92a8173..9c17160 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -19,6 +19,7 @@ package protocols import ( "container/list" + "fmt" "io" "github.com/apache/skywalking-rover/pkg/accesslog/common" @@ -33,16 +34,18 @@ import ( ) var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1") +var http1AnalyzeMaxRetryCount = 3 -type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) +type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error type HTTP1Protocol struct { ctx *common.AccessLogContext analyze HTTP1ProtocolAnalyze + reader *reader.Reader } func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyze) *HTTP1Protocol { - protocol := &HTTP1Protocol{ctx: ctx} + protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()} if analyze == nil { protocol.analyze = protocol.HandleHTTPData } else { @@ -55,29 +58,41 @@ type HTTP1Metrics struct { ConnectionID uint64 RandomID uint64 - halfRequests *list.List + halfRequests *list.List + analyzeUnFinished *list.List } func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics { return &HTTP1Metrics{ - ConnectionID: connectionID, - RandomID: randomID, - halfRequests: list.New(), + ConnectionID: connectionID, + RandomID: randomID, + halfRequests: list.New(), + analyzeUnFinished: list.New(), } } +type HTTP1AnalyzeUnFinished struct { + request *reader.Request + response *reader.Response + retryCount int +} + func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error { metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics) buf := connection.Buffer(enums.ConnectionProtocolHTTP) http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d", metrics.ConnectionID, metrics.RandomID, buf.DataLength()) + p.handleUnFinishedEvents(metrics) buf.ResetForLoopReading() for { if !buf.PrepareForReading() { return nil } - messageType, err := reader.IdentityMessageType(buf) + messageType, err := p.reader.IdentityMessageType(buf) + log.Debugf("ready to reading message type, messageType: %v, buf: %p, data id: %d, "+ + "connection ID: %d, random ID: %d, error: %v", messageType, buf, buf.Position().DataID(), + metrics.ConnectionID, metrics.RandomID, err) if err != nil { http1Log.Debugf("failed to identity message type, %v", err) if buf.SkipCurrentElement() { @@ -89,19 +104,25 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe var result enums.ParseResult switch messageType { case reader.MessageTypeRequest: - result, _ = p.handleRequest(metrics, buf) + result, err = p.handleRequest(metrics, buf) case reader.MessageTypeResponse: - result, _ = p.handleResponse(metrics, buf) + result, err = p.handleResponse(metrics, buf) case reader.MessageTypeUnknown: result = enums.ParseResultSkipPackage } + if err != nil { + http1Log.Warnf("failed to handle HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d, error: %v", + metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err) + } finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = buf.RemoveReadElements() + finishReading = buf.RemoveReadElements(false) case enums.ParseResultSkipPackage: finishReading = buf.SkipCurrentElement() + log.Debugf("skip current element, data id: %d, buf: %p, connection ID: %d, random ID: %d", + buf.Position().DataID(), buf, metrics.ConnectionID, metrics.RandomID) } if finishReading { @@ -116,7 +137,7 @@ func (p *HTTP1Protocol) ForProtocol() enums.ConnectionProtocol { } func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer) (enums.ParseResult, error) { - req, result, err := reader.ReadRequest(buf, true) + req, result, err := p.reader.ReadRequest(buf, true) if err != nil { return enums.ParseResultSkipPackage, err } @@ -130,12 +151,14 @@ func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer) func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) { firstRequest := metrics.halfRequests.Front() if firstRequest == nil { + log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d", + metrics.ConnectionID, metrics.RandomID) return enums.ParseResultSkipPackage, nil } request := metrics.halfRequests.Remove(firstRequest).(*reader.Request) // parsing response - response, result, err := reader.ReadResponse(request, b, true) + response, result, err := p.reader.ReadResponse(request, b, true) defer func() { // if parsing response failed, then put the request back to the list if result != enums.ParseResultSuccess { @@ -149,37 +172,71 @@ func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) } // getting the request and response, then send to the forwarder - p.analyze(metrics, request, response) + if analyzeError := p.analyze(metrics, request, response); analyzeError != nil { + p.appendAnalyzeUnFinished(metrics, request, response) + } return enums.ParseResultSuccess, nil } -func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) { - detailEvents := make([]events.SocketDetail, 0) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.HeaderBuffer()) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.BodyBuffer()) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.HeaderBuffer()) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.BodyBuffer()) - - if len(detailEvents) == 0 { - http1Log.Warnf("cannot found any detail events for HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d-%d", - metrics.ConnectionID, metrics.RandomID, - request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID()) - return +func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) { + metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{ + request: request, + response: response, + retryCount: 0, + }) +} + +func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) { + for element := m.analyzeUnFinished.Front(); element != nil; { + unFinished := element.Value.(*HTTP1AnalyzeUnFinished) + err := p.analyze(m, unFinished.request, unFinished.response) + if err != nil { + unFinished.retryCount++ + if unFinished.retryCount < http1AnalyzeMaxRetryCount { + element = element.Next() + continue + } + http1Log.Warnf("failed to analyze HTTP1 request and response, connection ID: %d, random ID: %d, "+ + "retry count: %d, error: %v", m.ConnectionID, m.RandomID, unFinished.retryCount, err) + } + next := element.Next() + m.analyzeUnFinished.Remove(element) + element = next } - http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events , connection ID: %d, random ID: %d", - len(detailEvents), metrics.ConnectionID, metrics.RandomID) +} + +func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error { + details := make([]events.SocketDetail, 0) + var allInclude = true + var idRange *buffer.DataIDRange + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.HeaderBuffer(), idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.BodyBuffer(), idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.HeaderBuffer(), idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.BodyBuffer(), idRange, allInclude) + + if !allInclude { + return fmt.Errorf("cannot found full detail events for HTTP/1.x protocol, "+ + "data id: %d-%d, current details count: %d", + request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID(), len(details)) + } + + http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events, "+ + "connection ID: %d, random ID: %d, data range: %d-%d(%t)", + len(details), metrics.ConnectionID, metrics.RandomID, idRange.From, idRange.To, idRange.IsToBufferReadFinished) originalRequest := request.Original() originalResponse := response.Original() + // delete details(each request or response is fine because it's will delete the original buffer) + idRange.DeleteDetails(request.HeaderBuffer()) defer func() { p.CloseStream(originalRequest.Body) p.CloseStream(originalResponse.Body) }() - forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, &v3.AccessLogProtocolLogs{ + forwarder.SendTransferProtocolEvent(p.ctx, details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ - StartTime: forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()), - EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()), + StartTime: forwarder.BuildOffsetTimestamp(details[0].GetStartTime()), + EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()), Version: v3.AccessLogHTTPProtocolVersion_HTTP1, Request: &v3.AccessLogHTTPProtocolRequest{ Method: TransformHTTPMethod(originalRequest.Method), @@ -198,6 +255,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Re }, }, }) + return nil } func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) { diff --git a/pkg/accesslog/collector/protocols/http2.go b/pkg/accesslog/collector/protocols/http2.go index b18226a..266413a 100644 --- a/pkg/accesslog/collector/protocols/http2.go +++ b/pkg/accesslog/collector/protocols/http2.go @@ -19,6 +19,7 @@ package protocols import ( "errors" + "fmt" "strconv" "strings" "time" @@ -42,7 +43,7 @@ var maxHTTP2StreamingTime = time.Minute * 3 var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2") -type HTTP2StreamAnalyze func(stream *HTTP2Streaming) +type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error type HTTP2Protocol struct { ctx *common.AccessLogContext @@ -140,7 +141,7 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = buf.RemoveReadElements() + finishReading = buf.RemoveReadElements(false) case enums.ParseResultSkipPackage: finishReading = buf.SkipCurrentElement() } @@ -194,12 +195,12 @@ func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos *buffer if !streaming.IsInResponse { r.AppendHeaders(streaming.ReqHeader, headers) - streaming.ReqHeaderBuffer = buffer.CombineSlices(true, streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.ReqHeaderBuffer = buffer.CombineSlices(true, buf, streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position())) return enums.ParseResultSuccess, false, nil } r.AppendHeaders(streaming.RespHeader, headers) - streaming.RespHeaderBuffer = buffer.CombineSlices(true, streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.RespHeaderBuffer = buffer.CombineSlices(true, buf, streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position())) // is end of stream and in the response if header.Flags.Has(http2.FlagHeadersEndStream) { @@ -235,24 +236,27 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui } } -func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) { - detailEvents := make([]events.SocketDetail, 0) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.ReqHeaderBuffer) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.ReqBodyBuffer) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.RespHeaderBuffer) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.RespBodyBuffer) - - if len(detailEvents) == 0 { - http2Log.Warnf("cannot found any detail events for HTTP/2 protocol, data id: %d-%d", - stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), stream.RespBodyBuffer.LastSocketBuffer().DataID()) - return +func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error { + details := make([]events.SocketDetail, 0) + var allInclude = true + var idRange *buffer.DataIDRange + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.ReqHeaderBuffer, idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.ReqBodyBuffer, idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.RespHeaderBuffer, idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.RespBodyBuffer, idRange, allInclude) + + if !allInclude { + return fmt.Errorf("cannot found any detail events for HTTP/2 protocol, data id: %d-%d, current details count: %d", + stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), stream.RespBodyBuffer.LastSocketBuffer().DataID(), + len(details)) } + idRange.DeleteDetails(stream.ReqHeaderBuffer) - forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, &v3.AccessLogProtocolLogs{ + forwarder.SendTransferProtocolEvent(r.ctx, details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ - StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, detailEvents[0]).GetStartTime()), - EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()), + StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, details[0]).GetStartTime()), + EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()), Version: v3.AccessLogHTTPProtocolVersion_HTTP2, Request: &v3.AccessLogHTTPProtocolRequest{ Method: r.ParseHTTPMethod(stream), @@ -272,6 +276,7 @@ func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) { }, }, }) + return nil } func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogHTTPProtocolRequestMethod { @@ -284,10 +289,14 @@ func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogH } func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def events.SocketDetail) events.SocketDetail { - if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 { + if buf == nil { + return def + } + details := buf.BuildDetails() + if details == nil || details.Len() == 0 { return def } - return buf.Details().Front().Value.(events.SocketDetail) + return details.Front().Value.(events.SocketDetail) } func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 { @@ -315,9 +324,9 @@ func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos *buffer.P return enums.ParseResultSkipPackage, false, err } if !streaming.IsInResponse { - streaming.ReqBodyBuffer = buffer.CombineSlices(true, streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.ReqBodyBuffer = buffer.CombineSlices(true, buf, streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position())) } else { - streaming.RespBodyBuffer = buffer.CombineSlices(true, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position())) } r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming) diff --git a/pkg/accesslog/collector/protocols/protocol.go b/pkg/accesslog/collector/protocols/protocol.go index 41ac8a2..862e80f 100644 --- a/pkg/accesslog/collector/protocols/protocol.go +++ b/pkg/accesslog/collector/protocols/protocol.go @@ -56,17 +56,29 @@ type Protocol interface { Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error } -func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf *buffer.Buffer) []events.SocketDetail { - if buf == nil || buf.DetailLength() == 0 { - return result +func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf *buffer.Buffer, dataIdRange *buffer.DataIDRange, + allDetailInclude bool) ([]events.SocketDetail, *buffer.DataIDRange, bool) { + if buf == nil || !allDetailInclude { + return result, dataIdRange, false } - for e := buf.Details().Front(); e != nil; e = e.Next() { + details := buf.BuildDetails() + if details == nil || details.Len() == 0 { + return result, dataIdRange, false + } + currentDataIdRange := buf.BuildTotalDataIDRange() + if !currentDataIdRange.IsIncludeAllDetails(details) { + return result, dataIdRange, false + } + for e := details.Front(); e != nil; e = e.Next() { if len(result) > 0 && result[len(result)-1] == e.Value { continue } result = append(result, e.Value.(events.SocketDetail)) } - return result + if dataIdRange == nil { + return result, currentDataIdRange, true + } + return result, dataIdRange.Append(currentDataIdRange), true } func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog *logger.Logger) *v3.AccessLogTraceInfo { diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 68708fc..833d323 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -215,9 +215,9 @@ func (p *PartitionContext) Consume(data interface{}) { case events.SocketDetail: pid, _ := events.ParseConnectionID(event.GetConnectionID()) log.Debugf("receive the socket detail event, connection ID: %d, random ID: %d, pid: %d, data id: %d, "+ - "function name: %s, package count: %d, package size: %d, ssl: %d", + "function name: %s, package count: %d, package size: %d, ssl: %d, protocol: %d", event.GetConnectionID(), event.GetRandomID(), pid, event.DataID(), event.GetFunctionName(), - event.GetL4PackageCount(), event.GetL4TotalPackageSize(), event.GetSSL()) + event.GetL4PackageCount(), event.GetL4TotalPackageSize(), event.GetSSL(), event.GetProtocol()) if event.GetProtocol() == enums.ConnectionProtocolUnknown { // if the connection protocol is unknown, we just needs to add this into the kernel log forwarder.SendTransferNoProtocolEvent(p.context, event) diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go index 1ef0aeb..0298088 100644 --- a/pkg/accesslog/common/config.go +++ b/pkg/accesslog/common/config.go @@ -37,7 +37,8 @@ type FlushConfig struct { type ConnectionAnalyzeConfig struct { PerCPUBufferSize string `mapstructure:"per_cpu_buffer"` - Parallels int `mapstructure:"parallels"` + ParseParallels int `mapstructure:"parse_parallels"` + AnalyzeParallels int `mapstructure:"analyze_parallels"` QueueSize int `mapstructure:"queue_size"` } diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go index 3917931..a5ac46c 100644 --- a/pkg/accesslog/events/detail.go +++ b/pkg/accesslog/events/detail.go @@ -103,6 +103,10 @@ func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) { d.SSL = r.ReadUint8() } +func (d *SocketDetailEvent) Time() uint64 { + return d.StartTime +} + func (d *SocketDetailEvent) GetConnectionID() uint64 { return d.ConnectionID } diff --git a/pkg/profiling/task/network/analyze/events/data.go b/pkg/profiling/task/network/analyze/events/data.go index 804e96b..1f71fa8 100644 --- a/pkg/profiling/task/network/analyze/events/data.go +++ b/pkg/profiling/task/network/analyze/events/data.go @@ -108,6 +108,10 @@ type SocketDetailEvent struct { RTTTime uint32 } +func (s *SocketDetailEvent) Time() uint64 { + return 0 +} + func (s *SocketDetailEvent) DataID() uint64 { return s.DataID0 } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go index 52b181d..1acc342 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go @@ -185,7 +185,7 @@ func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) { finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = connection.buffer.RemoveReadElements() + finishReading = connection.buffer.RemoveReadElements(true) case enums.ParseResultSkipPackage: finishReading = connection.buffer.SkipCurrentElement() } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go index 245e6dd..6cec0ef 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go @@ -51,7 +51,8 @@ var DurationHistogramBuckets = []float64{ type Analyzer struct { // cache connection metrics if the connect event not receive or process - cache map[string]*ConnectionMetrics + cache map[string]*ConnectionMetrics + reader *reader.Reader sampleConfig *SamplingConfig } @@ -67,7 +68,8 @@ type ConnectionMetrics struct { func NewHTTP1Analyzer() protocol.Protocol { return &Analyzer{ - cache: make(map[string]*ConnectionMetrics), + cache: make(map[string]*ConnectionMetrics), + reader: reader.NewReader(), } } @@ -90,7 +92,7 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) { func (h *Analyzer) ParseProtocol(connectionID uint64, metrics protocol.Metrics, buf *buffer.Buffer) enums.ParseResult { connectionMetrics := metrics.(*ConnectionMetrics) - messageType, err := reader.IdentityMessageType(buf) + messageType, err := h.reader.IdentityMessageType(buf) if err != nil { return enums.ParseResultSkipPackage } @@ -116,7 +118,7 @@ func (h *Analyzer) ParseProtocol(connectionID uint64, metrics protocol.Metrics, func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf *buffer.Buffer) (enums.ParseResult, error) { // parsing request - req, r, err := reader.ReadRequest(buf, true) + req, r, err := h.reader.ReadRequest(buf, true) if err != nil { return enums.ParseResultSkipPackage, err } @@ -137,7 +139,7 @@ func (h *Analyzer) handleResponse(connectionID uint64, metrics *ConnectionMetric request := metrics.halfData.Remove(firstElement).(*reader.Request) // parsing request - response, r, err := reader.ReadResponse(request, buf, true) + response, r, err := h.reader.ReadResponse(request, buf, true) if err != nil { return enums.ParseResultSkipPackage, err } else if r != enums.ParseResultSuccess { diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go index 4c73cf0..d041997 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go @@ -286,8 +286,8 @@ func (h *Trace) appendHTTPEvent(attaches []*v3.SpanAttachedEvent, process api.Pr func (h *Trace) appendSyscallEvents(attachEvents []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic, message *reader.MessageOpt) []*v3.SpanAttachedEvent { - headerDetails := message.HeaderBuffer().Details() - bodyDetails := message.BodyBuffer().Details() + headerDetails := message.HeaderBuffer().BuildDetails() + bodyDetails := message.BodyBuffer().BuildDetails() dataIDCache := make(map[uint64]bool) for e := headerDetails.Front(); e != nil; e = e.Next() { event := e.Value.(*events.SocketDetailEvent) diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go index e4aac3f..8cc8c85 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go @@ -35,9 +35,6 @@ import ( ) var ( - headBuffer = make([]byte, 16) - bodyBuffer = make([]byte, 4096) - requestMethods = []string{ "GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", "TRACE", "PATCH", } @@ -53,15 +50,27 @@ const ( MessageTypeUnknown ) -func IdentityMessageType(reader *buffer.Buffer) (MessageType, error) { - n, err := reader.Peek(headBuffer) +type Reader struct { + headBuffer []byte + bodyBuffer []byte +} + +func NewReader() *Reader { + return &Reader{ + headBuffer: make([]byte, 16), + bodyBuffer: make([]byte, 4096), + } +} + +func (r *Reader) IdentityMessageType(reader *buffer.Buffer) (MessageType, error) { + n, err := reader.Peek(r.headBuffer) if err != nil { return MessageTypeUnknown, err - } else if n != len(headBuffer) { + } else if n != len(r.headBuffer) { return MessageTypeUnknown, fmt.Errorf("need more content for header") } - headerString := string(headBuffer) + headerString := string(r.headBuffer) isRequest := false for _, method := range requestMethods { if strings.HasPrefix(headerString, method) { @@ -83,6 +92,7 @@ type Message interface { Headers() http.Header HeaderBuffer() *buffer.Buffer BodyBuffer() *buffer.Buffer + Reader() *Reader } type MessageOpt struct { @@ -192,7 +202,7 @@ func (m *MessageOpt) isChunked() bool { func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *buffer.Buffer, reader *bufio.Reader) (*buffer.Buffer, enums.ParseResult, error) { startPosition := buf.OffsetPosition(-reader.Buffered()) for !buf.IsCurrentPacketReadFinished() { - _, err := buf.Read(bodyBuffer) + _, err := buf.Read(m.Reader().bodyBuffer) if err != nil { return nil, enums.ParseResultSkipPackage, err } @@ -237,7 +247,7 @@ func (m *MessageOpt) checkChunkedBody(buf *buffer.Buffer, bodyReader *bufio.Read return nil, enums.ParseResultSkipPackage, fmt.Errorf("the chunk data parding error, should be empty: %s", d) } } - return buffer.CombineSlices(true, buffers...), enums.ParseResultSuccess, nil + return buffer.CombineSlices(true, buf, buffers...), enums.ParseResultSuccess, nil } func (m *MessageOpt) checkBodyWithSize(buf *buffer.Buffer, reader *bufio.Reader, size int, @@ -248,10 +258,10 @@ func (m *MessageOpt) checkBodyWithSize(buf *buffer.Buffer, reader *bufio.Reader, startPosition := buf.OffsetPosition(-reader.Buffered()) for reduceSize > 0 { readSize = reduceSize - if readSize > len(bodyBuffer) { - readSize = len(bodyBuffer) + if readSize > len(m.Reader().bodyBuffer) { + readSize = len(m.Reader().bodyBuffer) } - lastReadSize, err = reader.Read(bodyBuffer[0:readSize]) + lastReadSize, err = reader.Read(m.Reader().bodyBuffer[0:readSize]) if err != nil { if err == buffer.ErrNotComplete { return nil, enums.ParseResultSkipPackage, nil diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go index 8791611..7224727 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go @@ -34,6 +34,7 @@ type Request struct { original *http.Request headerBuffer *buffer.Buffer bodyBuffer *buffer.Buffer + reader *Reader } func (r *Request) Headers() http.Header { @@ -48,6 +49,10 @@ func (r *Request) BodyBuffer() *buffer.Buffer { return r.bodyBuffer } +func (r *Request) Reader() *Reader { + return r.reader +} + func (r *Request) MinDataID() int { return int(r.headerBuffer.FirstSocketBuffer().DataID()) } @@ -57,11 +62,11 @@ func (r *Request) Original() *http.Request { } // nolint -func ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, enums.ParseResult, error) { +func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, enums.ParseResult, error) { bufReader := bufio.NewReader(buf) tp := textproto.NewReader(bufReader) req := &http.Request{} - result := &Request{original: req} + result := &Request{original: req, reader: r} result.MessageOpt = &MessageOpt{result} headerStartPosition := buf.Position() diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go index 966edb8..fae907b 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go @@ -35,6 +35,7 @@ type Response struct { original *http.Response headerBuffer *buffer.Buffer bodyBuffer *buffer.Buffer + reader *Reader } func (r *Response) Headers() http.Header { @@ -49,15 +50,19 @@ func (r *Response) BodyBuffer() *buffer.Buffer { return r.bodyBuffer } +func (r *Response) Reader() *Reader { + return r.reader +} + func (r *Response) Original() *http.Response { return r.original } -func ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response, enums.ParseResult, error) { +func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response, enums.ParseResult, error) { bufReader := bufio.NewReader(buf) tp := textproto.NewReader(bufReader) resp := &http.Response{} - result := &Response{original: resp, req: req} + result := &Response{original: resp, req: req, reader: r} result.MessageOpt = &MessageOpt{result} headerStartPosition := buf.Position() diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 0bfc5d9..c5ce2ca 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -24,12 +24,18 @@ import ( "sync" "time" + "github.com/sirupsen/logrus" + + "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/tools/enums" "github.com/apache/skywalking-rover/pkg/tools/host" ) var ( ErrNotComplete = errors.New("socket: not complete event") + emptyList = list.New() + + log = logger.GetLogger("tools", "buffer") ) type SocketDataBuffer interface { @@ -67,6 +73,14 @@ type SocketDataBuffer interface { type SocketDataDetail interface { // DataID data id of the buffer DataID() uint64 + // Time (BPF) of the detail event + Time() uint64 +} + +type DataIDRange struct { + From uint64 + To uint64 + IsToBufferReadFinished bool } type Buffer struct { @@ -82,6 +96,13 @@ type Buffer struct { // record the latest expired data id in connection for expire the older socket detail // because the older socket detail may not be received in buffer latestExpiredDataID uint64 + + // originalBuffer record this buffer is from Buffer.Slice or CombineSlices + // if it's not empty, then when getting the details should query from originalBuffer + // Because the BPF Queue is unsorted and can be delayed, so the details should query by real buffer + originalBuffer *Buffer + // endPosition record the end position of the originalBuffer + endPosition *Position } type SocketDataEventLimited struct { @@ -106,6 +127,48 @@ func (s *SocketDataEventLimited) BufferStartPosition() int { return s.From } +func (i *DataIDRange) IsIncludeAllDetails(list *list.List) bool { + if list.Len() == 0 { + return false + } + for e := list.Front(); e != nil; e = e.Next() { + if e.Value.(SocketDataDetail).DataID() < i.From || e.Value.(SocketDataDetail).DataID() > i.To { + return false + } + } + return true +} + +func (i *DataIDRange) Append(other *DataIDRange) *DataIDRange { + if other.From < i.From { + i.From = other.From + } + if other.To > i.To { + i.To = other.To + } + i.IsToBufferReadFinished = other.IsToBufferReadFinished + return i +} + +func (i *DataIDRange) DeleteDetails(buf *Buffer) { + if buf.originalBuffer != nil { + i.DeleteDetails(buf.originalBuffer) + } + for e := buf.detailEvents.Front(); e != nil; { + next := e.Next() + dataId := e.Value.(SocketDataDetail).DataID() + if dataId >= i.From && dataId <= i.To { + if !i.IsToBufferReadFinished && dataId == i.To { + break + } + buf.detailEvents.Remove(e) + log.Debugf("delete detail event from buffer, data id: %d, ref: %p, range: %d-%d(%t)", + dataId, buf, i.From, i.To, i.IsToBufferReadFinished) + } + e = next + } +} + type Position struct { // element of the event list element *list.Element @@ -143,6 +206,26 @@ func (r *Buffer) FindFirstDataBuffer(dataID uint64) SocketDataBuffer { return nil } +func (r *Buffer) BuildTotalDataIDRange() *DataIDRange { + if r.dataEvents.Len() == 0 { + return nil + } + var toIndex uint64 + var isToBufferReadFinished bool + if r.endPosition != nil { + toIndex = r.endPosition.DataID() + isToBufferReadFinished = r.endPosition.bufIndex == r.endPosition.element.Value.(SocketDataBuffer).BufferLen() + } else { + toIndex = r.current.DataID() + isToBufferReadFinished = r.current.bufIndex == r.current.element.Value.(SocketDataBuffer).BufferLen() + } + return &DataIDRange{ + From: r.head.DataID(), + To: toIndex, + IsToBufferReadFinished: isToBufferReadFinished, + } +} + func (r *Buffer) Position() *Position { return r.current.Clone() } @@ -155,6 +238,7 @@ func (r *Buffer) Clean() { r.detailEvents = list.New() r.head = nil r.current = nil + r.endPosition = nil } func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { @@ -198,11 +282,13 @@ func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { } return &Buffer{ - dataEvents: dataEvents, - detailEvents: detailEvents, - validated: validated, - head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, - current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, + dataEvents: dataEvents, + detailEvents: detailEvents, + validated: validated, + head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, + current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, + originalBuffer: r, + endPosition: end, } } @@ -219,7 +305,36 @@ func (r *Buffer) Len() int { return result } -func (r *Buffer) Details() *list.List { +func (r *Buffer) BuildDetails() *list.List { + // if the original buffer is not empty, then query the details from original buffer + if r.originalBuffer != nil { + events := list.New() + fromDataId := r.head.DataID() + var endDataId uint64 + if r.endPosition != nil { + endDataId = r.endPosition.DataID() + } else { + endDataId = r.current.DataID() + } + + for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() { + if e.Value.(SocketDataDetail).DataID() >= fromDataId && e.Value.(SocketDataDetail).DataID() <= endDataId { + events.PushBack(e.Value) + } + } + if events.Len() == 0 && log.Enable(logrus.DebugLevel) { + dataIdList := make([]uint64, 0) + for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() { + if e.Value != nil { + dataIdList = append(dataIdList, e.Value.(SocketDataDetail).DataID()) + } + } + log.Debugf("cannot found details from original buffer, from data id: %d, end data id: %d, "+ + "ref: %p, existing details data id list: %v", fromDataId, endDataId, r.originalBuffer, dataIdList) + } + + return events + } return r.detailEvents } @@ -281,7 +396,7 @@ func (r *Buffer) DetectNotSendingLastPosition() *Position { return nil } -func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { +func CombineSlices(validated bool, originalBuffer *Buffer, buffers ...*Buffer) *Buffer { if len(buffers) == 0 { return nil } @@ -289,7 +404,6 @@ func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { return buffers[0] } dataEvents := list.New() - detailEvents := list.New() for _, b := range buffers { if b == nil || b.head == nil { continue @@ -304,15 +418,20 @@ func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { } else { dataEvents.PushBackList(b.dataEvents) } - detailEvents.PushBackList(b.detailEvents) } + var endPosition = buffers[len(buffers)-1].endPosition + if endPosition == nil { + endPosition = buffers[len(buffers)-1].Position() + } return &Buffer{ - dataEvents: dataEvents, - detailEvents: detailEvents, - validated: validated, - head: &Position{element: dataEvents.Front(), bufIndex: 0}, - current: &Position{element: dataEvents.Front(), bufIndex: 0}, + dataEvents: dataEvents, + detailEvents: emptyList, // for the combined buffer, the details list should be queried from original buffer + validated: validated, + head: &Position{element: dataEvents.Front(), bufIndex: 0}, + current: &Position{element: dataEvents.Front(), bufIndex: 0}, + originalBuffer: originalBuffer, + endPosition: endPosition, } } @@ -505,12 +624,12 @@ func (r *Buffer) PrepareForReading() bool { } // nolint -func (r *Buffer) RemoveReadElements() bool { +func (r *Buffer) RemoveReadElements(includeDetails bool) bool { r.eventLocker.Lock() defer r.eventLocker.Unlock() // delete until the last data id - if r.head.element != nil && r.current.element != nil { + if includeDetails && r.head.element != nil && r.current.element != nil { firstDataID := r.head.element.Value.(SocketDataBuffer).DataID() currentBuffer := r.current.element.Value.(SocketDataBuffer) lastDataID := currentBuffer.DataID() @@ -531,6 +650,7 @@ func (r *Buffer) RemoveReadElements() bool { if startDelete { tmp := e.Next() r.detailEvents.Remove(e) + log.Debugf("delete detail event from readed buffer, data id: %d, ref: %p", event.DataID(), r) e = tmp } else { e = e.Next() @@ -588,6 +708,10 @@ func (r *Buffer) AppendDetailEvent(event SocketDataDetail) { r.detailEvents.PushFront(event) return } + if r.detailEvents.Back().Value == nil { + r.detailEvents.PushFront(event) + return + } if r.detailEvents.Back().Value.(SocketDataDetail).DataID() < event.DataID() { r.detailEvents.PushBack(event) return @@ -660,7 +784,13 @@ func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int { // detail event queue count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool { - return r.latestExpiredDataID > 0 && element.Value.(SocketDataDetail).DataID() <= r.latestExpiredDataID + detail := element.Value.(SocketDataDetail) + isDelete := r.latestExpiredDataID > 0 && detail.DataID() <= r.latestExpiredDataID || + (detail.Time() > 0 && expireTime.After(host.Time(detail.Time()))) + if isDelete { + log.Debugf("delete expired detail event, data id: %d, buf: %p", detail.DataID(), r) + } + return isDelete }) return count } @@ -672,13 +802,6 @@ func (r *Buffer) DataLength() int { return r.dataEvents.Len() } -func (r *Buffer) DetailLength() int { - if r.detailEvents == nil { - return 0 - } - return r.detailEvents.Len() -} - func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int { count := 0 for e := l.Front(); e != nil; {
