This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch missing-details in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit b8addef6e9c255b0cc137a0cef77d66060561c2b Author: mrproliu <[email protected]> AuthorDate: Thu Dec 12 11:20:53 2024 +0800 Fix missing details when high traffic --- pkg/accesslog/collector/protocols/http1.go | 91 +++++++++---- pkg/accesslog/collector/protocols/http2.go | 53 ++++---- pkg/accesslog/collector/protocols/protocol.go | 22 +++- pkg/accesslog/collector/protocols/queue.go | 4 +- pkg/accesslog/events/detail.go | 4 + pkg/profiling/task/network/analyze/events/data.go | 4 + .../analyze/layer7/protocols/base/analyzer.go | 2 +- .../analyze/layer7/protocols/http1/metrics.go | 4 +- .../layer7/protocols/http1/reader/reader.go | 2 +- pkg/tools/buffer/buffer.go | 144 +++++++++++++++++---- 10 files changed, 250 insertions(+), 80 deletions(-) diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 92a8173..9f4a560 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -19,6 +19,7 @@ package protocols import ( "container/list" + "fmt" "io" "github.com/apache/skywalking-rover/pkg/accesslog/common" @@ -33,8 +34,9 @@ import ( ) var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1") +var http1AnalyzeMaxRetryCount = 5 -type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) +type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error type HTTP1Protocol struct { ctx *common.AccessLogContext @@ -55,22 +57,31 @@ type HTTP1Metrics struct { ConnectionID uint64 RandomID uint64 - halfRequests *list.List + halfRequests *list.List + analyzeUnFinished *list.List } func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics { return &HTTP1Metrics{ - ConnectionID: connectionID, - RandomID: randomID, - halfRequests: list.New(), + ConnectionID: connectionID, + RandomID: randomID, + halfRequests: list.New(), + analyzeUnFinished: list.New(), } } +type HTTP1AnalyzeUnFinished struct { + request *reader.Request + response *reader.Response + retryCount int +} + func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error { metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics) buf := connection.Buffer(enums.ConnectionProtocolHTTP) http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d", metrics.ConnectionID, metrics.RandomID, buf.DataLength()) + p.handleUnFinishedEvents(metrics) buf.ResetForLoopReading() for { if !buf.PrepareForReading() { @@ -99,7 +110,7 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = buf.RemoveReadElements() + finishReading = buf.RemoveReadElements(false) case enums.ParseResultSkipPackage: finishReading = buf.SkipCurrentElement() } @@ -149,37 +160,70 @@ func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) } // getting the request and response, then send to the forwarder - p.analyze(metrics, request, response) + if analyzeError := p.analyze(metrics, request, response); analyzeError != nil { + p.appendAnalyzeUnFinished(metrics, request, response) + } return enums.ParseResultSuccess, nil } -func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) { - detailEvents := make([]events.SocketDetail, 0) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.HeaderBuffer()) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, request.BodyBuffer()) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.HeaderBuffer()) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, response.BodyBuffer()) - - if len(detailEvents) == 0 { - http1Log.Warnf("cannot found any detail events for HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d-%d", - metrics.ConnectionID, metrics.RandomID, - request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID()) - return +func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) { + metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{ + request: request, + response: response, + retryCount: 0, + }) +} + +func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) { + for element := m.analyzeUnFinished.Front(); element != nil; { + unFinished := element.Value.(*HTTP1AnalyzeUnFinished) + err := p.analyze(m, unFinished.request, unFinished.response) + if err != nil { + unFinished.retryCount++ + if unFinished.retryCount <= http1AnalyzeMaxRetryCount { + element = element.Next() + continue + } + http1Log.Warnf("failed to analyze HTTP1 request and response, connection ID: %d, random ID: %d, "+ + "error: %v", m.ConnectionID, m.RandomID, err) + } + next := element.Next() + m.analyzeUnFinished.Remove(element) + element = next + } +} + +func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error { + details := make([]events.SocketDetail, 0) + var allInclude = true + var idRange *buffer.DataIDRange + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.HeaderBuffer(), idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.BodyBuffer(), idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.HeaderBuffer(), idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.BodyBuffer(), idRange, allInclude) + + if !allInclude { + return fmt.Errorf("cannot found full detail events for HTTP/1.x protocol, "+ + "data id: %d-%d, current details count: %d", + request.MinDataID(), response.BodyBuffer().LastSocketBuffer().DataID(), len(details)) } + http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events , connection ID: %d, random ID: %d", - len(detailEvents), metrics.ConnectionID, metrics.RandomID) + len(details), metrics.ConnectionID, metrics.RandomID) originalRequest := request.Original() originalResponse := response.Original() + // delete details(each request or response is fine because it's will delete the original buffer) + idRange.DeleteDetails(request.HeaderBuffer()) defer func() { p.CloseStream(originalRequest.Body) p.CloseStream(originalResponse.Body) }() - forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, &v3.AccessLogProtocolLogs{ + forwarder.SendTransferProtocolEvent(p.ctx, details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ - StartTime: forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()), - EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()), + StartTime: forwarder.BuildOffsetTimestamp(details[0].GetStartTime()), + EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()), Version: v3.AccessLogHTTPProtocolVersion_HTTP1, Request: &v3.AccessLogHTTPProtocolRequest{ Method: TransformHTTPMethod(originalRequest.Method), @@ -198,6 +242,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Re }, }, }) + return nil } func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) { diff --git a/pkg/accesslog/collector/protocols/http2.go b/pkg/accesslog/collector/protocols/http2.go index b18226a..266413a 100644 --- a/pkg/accesslog/collector/protocols/http2.go +++ b/pkg/accesslog/collector/protocols/http2.go @@ -19,6 +19,7 @@ package protocols import ( "errors" + "fmt" "strconv" "strings" "time" @@ -42,7 +43,7 @@ var maxHTTP2StreamingTime = time.Minute * 3 var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2") -type HTTP2StreamAnalyze func(stream *HTTP2Streaming) +type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error type HTTP2Protocol struct { ctx *common.AccessLogContext @@ -140,7 +141,7 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = buf.RemoveReadElements() + finishReading = buf.RemoveReadElements(false) case enums.ParseResultSkipPackage: finishReading = buf.SkipCurrentElement() } @@ -194,12 +195,12 @@ func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos *buffer if !streaming.IsInResponse { r.AppendHeaders(streaming.ReqHeader, headers) - streaming.ReqHeaderBuffer = buffer.CombineSlices(true, streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.ReqHeaderBuffer = buffer.CombineSlices(true, buf, streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position())) return enums.ParseResultSuccess, false, nil } r.AppendHeaders(streaming.RespHeader, headers) - streaming.RespHeaderBuffer = buffer.CombineSlices(true, streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.RespHeaderBuffer = buffer.CombineSlices(true, buf, streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position())) // is end of stream and in the response if header.Flags.Has(http2.FlagHeadersEndStream) { @@ -235,24 +236,27 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui } } -func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) { - detailEvents := make([]events.SocketDetail, 0) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.ReqHeaderBuffer) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.ReqBodyBuffer) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.RespHeaderBuffer) - detailEvents = AppendSocketDetailsFromBuffer(detailEvents, stream.RespBodyBuffer) - - if len(detailEvents) == 0 { - http2Log.Warnf("cannot found any detail events for HTTP/2 protocol, data id: %d-%d", - stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), stream.RespBodyBuffer.LastSocketBuffer().DataID()) - return +func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error { + details := make([]events.SocketDetail, 0) + var allInclude = true + var idRange *buffer.DataIDRange + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.ReqHeaderBuffer, idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.ReqBodyBuffer, idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.RespHeaderBuffer, idRange, allInclude) + details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.RespBodyBuffer, idRange, allInclude) + + if !allInclude { + return fmt.Errorf("cannot found any detail events for HTTP/2 protocol, data id: %d-%d, current details count: %d", + stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), stream.RespBodyBuffer.LastSocketBuffer().DataID(), + len(details)) } + idRange.DeleteDetails(stream.ReqHeaderBuffer) - forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, &v3.AccessLogProtocolLogs{ + forwarder.SendTransferProtocolEvent(r.ctx, details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ - StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, detailEvents[0]).GetStartTime()), - EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()), + StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, details[0]).GetStartTime()), + EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()), Version: v3.AccessLogHTTPProtocolVersion_HTTP2, Request: &v3.AccessLogHTTPProtocolRequest{ Method: r.ParseHTTPMethod(stream), @@ -272,6 +276,7 @@ func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) { }, }, }) + return nil } func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogHTTPProtocolRequestMethod { @@ -284,10 +289,14 @@ func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogH } func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def events.SocketDetail) events.SocketDetail { - if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 { + if buf == nil { + return def + } + details := buf.BuildDetails() + if details == nil || details.Len() == 0 { return def } - return buf.Details().Front().Value.(events.SocketDetail) + return details.Front().Value.(events.SocketDetail) } func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 { @@ -315,9 +324,9 @@ func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos *buffer.P return enums.ParseResultSkipPackage, false, err } if !streaming.IsInResponse { - streaming.ReqBodyBuffer = buffer.CombineSlices(true, streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.ReqBodyBuffer = buffer.CombineSlices(true, buf, streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position())) } else { - streaming.RespBodyBuffer = buffer.CombineSlices(true, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position())) + streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position())) } r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming) diff --git a/pkg/accesslog/collector/protocols/protocol.go b/pkg/accesslog/collector/protocols/protocol.go index 41ac8a2..862e80f 100644 --- a/pkg/accesslog/collector/protocols/protocol.go +++ b/pkg/accesslog/collector/protocols/protocol.go @@ -56,17 +56,29 @@ type Protocol interface { Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error } -func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf *buffer.Buffer) []events.SocketDetail { - if buf == nil || buf.DetailLength() == 0 { - return result +func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf *buffer.Buffer, dataIdRange *buffer.DataIDRange, + allDetailInclude bool) ([]events.SocketDetail, *buffer.DataIDRange, bool) { + if buf == nil || !allDetailInclude { + return result, dataIdRange, false } - for e := buf.Details().Front(); e != nil; e = e.Next() { + details := buf.BuildDetails() + if details == nil || details.Len() == 0 { + return result, dataIdRange, false + } + currentDataIdRange := buf.BuildTotalDataIDRange() + if !currentDataIdRange.IsIncludeAllDetails(details) { + return result, dataIdRange, false + } + for e := details.Front(); e != nil; e = e.Next() { if len(result) > 0 && result[len(result)-1] == e.Value { continue } result = append(result, e.Value.(events.SocketDetail)) } - return result + if dataIdRange == nil { + return result, currentDataIdRange, true + } + return result, dataIdRange.Append(currentDataIdRange), true } func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog *logger.Logger) *v3.AccessLogTraceInfo { diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 68708fc..833d323 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -215,9 +215,9 @@ func (p *PartitionContext) Consume(data interface{}) { case events.SocketDetail: pid, _ := events.ParseConnectionID(event.GetConnectionID()) log.Debugf("receive the socket detail event, connection ID: %d, random ID: %d, pid: %d, data id: %d, "+ - "function name: %s, package count: %d, package size: %d, ssl: %d", + "function name: %s, package count: %d, package size: %d, ssl: %d, protocol: %d", event.GetConnectionID(), event.GetRandomID(), pid, event.DataID(), event.GetFunctionName(), - event.GetL4PackageCount(), event.GetL4TotalPackageSize(), event.GetSSL()) + event.GetL4PackageCount(), event.GetL4TotalPackageSize(), event.GetSSL(), event.GetProtocol()) if event.GetProtocol() == enums.ConnectionProtocolUnknown { // if the connection protocol is unknown, we just needs to add this into the kernel log forwarder.SendTransferNoProtocolEvent(p.context, event) diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go index 3917931..a5ac46c 100644 --- a/pkg/accesslog/events/detail.go +++ b/pkg/accesslog/events/detail.go @@ -103,6 +103,10 @@ func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) { d.SSL = r.ReadUint8() } +func (d *SocketDetailEvent) Time() uint64 { + return d.StartTime +} + func (d *SocketDetailEvent) GetConnectionID() uint64 { return d.ConnectionID } diff --git a/pkg/profiling/task/network/analyze/events/data.go b/pkg/profiling/task/network/analyze/events/data.go index 804e96b..1f71fa8 100644 --- a/pkg/profiling/task/network/analyze/events/data.go +++ b/pkg/profiling/task/network/analyze/events/data.go @@ -108,6 +108,10 @@ type SocketDetailEvent struct { RTTTime uint32 } +func (s *SocketDetailEvent) Time() uint64 { + return 0 +} + func (s *SocketDetailEvent) DataID() uint64 { return s.DataID0 } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go index 52b181d..1acc342 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go @@ -185,7 +185,7 @@ func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) { finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = connection.buffer.RemoveReadElements() + finishReading = connection.buffer.RemoveReadElements(true) case enums.ParseResultSkipPackage: finishReading = connection.buffer.SkipCurrentElement() } diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go index 4c73cf0..d041997 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go @@ -286,8 +286,8 @@ func (h *Trace) appendHTTPEvent(attaches []*v3.SpanAttachedEvent, process api.Pr func (h *Trace) appendSyscallEvents(attachEvents []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic, message *reader.MessageOpt) []*v3.SpanAttachedEvent { - headerDetails := message.HeaderBuffer().Details() - bodyDetails := message.BodyBuffer().Details() + headerDetails := message.HeaderBuffer().BuildDetails() + bodyDetails := message.BodyBuffer().BuildDetails() dataIDCache := make(map[uint64]bool) for e := headerDetails.Front(); e != nil; e = e.Next() { event := e.Value.(*events.SocketDetailEvent) diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go index e4aac3f..8d299fd 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go @@ -237,7 +237,7 @@ func (m *MessageOpt) checkChunkedBody(buf *buffer.Buffer, bodyReader *bufio.Read return nil, enums.ParseResultSkipPackage, fmt.Errorf("the chunk data parding error, should be empty: %s", d) } } - return buffer.CombineSlices(true, buffers...), enums.ParseResultSuccess, nil + return buffer.CombineSlices(true, buf, buffers...), enums.ParseResultSuccess, nil } func (m *MessageOpt) checkBodyWithSize(buf *buffer.Buffer, reader *bufio.Reader, size int, diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 0bfc5d9..5cd9f16 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -30,6 +30,7 @@ import ( var ( ErrNotComplete = errors.New("socket: not complete event") + emptyList = list.New() ) type SocketDataBuffer interface { @@ -67,6 +68,14 @@ type SocketDataBuffer interface { type SocketDataDetail interface { // DataID data id of the buffer DataID() uint64 + // Time (BPF) of the detail event + Time() uint64 +} + +type DataIDRange struct { + From uint64 + To uint64 + IsToBufferReadFinished bool } type Buffer struct { @@ -82,6 +91,13 @@ type Buffer struct { // record the latest expired data id in connection for expire the older socket detail // because the older socket detail may not be received in buffer latestExpiredDataID uint64 + + // originalBuffer record this buffer is from Buffer.Slice or CombineSlices + // if it's not empty, then when getting the details should query from originalBuffer + // Because the BPF Queue is unsorted and can be delayed, so the details should query by real buffer + originalBuffer *Buffer + // endPosition record the end position of the originalBuffer + endPosition *Position } type SocketDataEventLimited struct { @@ -106,6 +122,46 @@ func (s *SocketDataEventLimited) BufferStartPosition() int { return s.From } +func (i *DataIDRange) IsIncludeAllDetails(list *list.List) bool { + if list.Len() == 0 { + return false + } + for e := list.Front(); e != nil; e = e.Next() { + if e.Value.(SocketDataDetail).DataID() < i.From || e.Value.(SocketDataDetail).DataID() > i.To { + return false + } + } + return true +} + +func (i *DataIDRange) Append(other *DataIDRange) *DataIDRange { + if other.From < i.From { + i.From = other.From + } + if other.To > i.To { + i.To = other.To + } + i.IsToBufferReadFinished = other.IsToBufferReadFinished + return i +} + +func (i *DataIDRange) DeleteDetails(buf *Buffer) { + if buf.originalBuffer != nil { + i.DeleteDetails(buf.originalBuffer) + } + for e := buf.detailEvents.Front(); e != nil; { + next := e.Next() + dataId := e.Value.(SocketDataDetail).DataID() + if dataId >= i.From && dataId <= i.To { + if !i.IsToBufferReadFinished && dataId == i.To { + break + } + buf.detailEvents.Remove(e) + } + e = next + } +} + type Position struct { // element of the event list element *list.Element @@ -143,6 +199,26 @@ func (r *Buffer) FindFirstDataBuffer(dataID uint64) SocketDataBuffer { return nil } +func (r *Buffer) BuildTotalDataIDRange() *DataIDRange { + if r.dataEvents.Len() == 0 { + return nil + } + var toIndex uint64 + var isToBufferReadFinished bool + if r.endPosition != nil { + toIndex = r.endPosition.DataID() + isToBufferReadFinished = r.endPosition.bufIndex == r.endPosition.element.Value.(SocketDataBuffer).BufferLen() + } else { + toIndex = r.current.DataID() + isToBufferReadFinished = r.current.bufIndex == r.current.element.Value.(SocketDataBuffer).BufferLen() + } + return &DataIDRange{ + From: r.head.DataID(), + To: toIndex, + IsToBufferReadFinished: isToBufferReadFinished, + } +} + func (r *Buffer) Position() *Position { return r.current.Clone() } @@ -155,6 +231,7 @@ func (r *Buffer) Clean() { r.detailEvents = list.New() r.head = nil r.current = nil + r.endPosition = nil } func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { @@ -198,11 +275,13 @@ func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { } return &Buffer{ - dataEvents: dataEvents, - detailEvents: detailEvents, - validated: validated, - head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, - current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, + dataEvents: dataEvents, + detailEvents: detailEvents, + validated: validated, + head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, + current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, + originalBuffer: r, + endPosition: end, } } @@ -219,7 +298,25 @@ func (r *Buffer) Len() int { return result } -func (r *Buffer) Details() *list.List { +func (r *Buffer) BuildDetails() *list.List { + // if the original buffer is not empty, then query the details from original buffer + if r.originalBuffer != nil { + events := list.New() + fromDataId := r.head.DataID() + var endDataId uint64 + if r.endPosition != nil { + endDataId = r.endPosition.DataID() + } else { + endDataId = r.current.DataID() + } + + for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() { + if e.Value.(SocketDataDetail).DataID() >= fromDataId && e.Value.(SocketDataDetail).DataID() <= endDataId { + events.PushBack(e.Value) + } + } + return events + } return r.detailEvents } @@ -281,7 +378,7 @@ func (r *Buffer) DetectNotSendingLastPosition() *Position { return nil } -func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { +func CombineSlices(validated bool, originalBuffer *Buffer, buffers ...*Buffer) *Buffer { if len(buffers) == 0 { return nil } @@ -289,7 +386,6 @@ func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { return buffers[0] } dataEvents := list.New() - detailEvents := list.New() for _, b := range buffers { if b == nil || b.head == nil { continue @@ -304,15 +400,20 @@ func CombineSlices(validated bool, buffers ...*Buffer) *Buffer { } else { dataEvents.PushBackList(b.dataEvents) } - detailEvents.PushBackList(b.detailEvents) } + var endPosition = buffers[len(buffers)-1].endPosition + if endPosition == nil { + endPosition = buffers[len(buffers)-1].Position() + } return &Buffer{ - dataEvents: dataEvents, - detailEvents: detailEvents, - validated: validated, - head: &Position{element: dataEvents.Front(), bufIndex: 0}, - current: &Position{element: dataEvents.Front(), bufIndex: 0}, + dataEvents: dataEvents, + detailEvents: emptyList, // for the combined buffer, the details list should be queried from original buffer + validated: validated, + head: &Position{element: dataEvents.Front(), bufIndex: 0}, + current: &Position{element: dataEvents.Front(), bufIndex: 0}, + originalBuffer: originalBuffer, + endPosition: endPosition, } } @@ -505,12 +606,12 @@ func (r *Buffer) PrepareForReading() bool { } // nolint -func (r *Buffer) RemoveReadElements() bool { +func (r *Buffer) RemoveReadElements(includeDetails bool) bool { r.eventLocker.Lock() defer r.eventLocker.Unlock() // delete until the last data id - if r.head.element != nil && r.current.element != nil { + if includeDetails && r.head.element != nil && r.current.element != nil { firstDataID := r.head.element.Value.(SocketDataBuffer).DataID() currentBuffer := r.current.element.Value.(SocketDataBuffer) lastDataID := currentBuffer.DataID() @@ -660,7 +761,9 @@ func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int { // detail event queue count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool { - return r.latestExpiredDataID > 0 && element.Value.(SocketDataDetail).DataID() <= r.latestExpiredDataID + detail := element.Value.(SocketDataDetail) + return r.latestExpiredDataID > 0 && detail.DataID() <= r.latestExpiredDataID || + (detail.Time() > 0 && expireTime.After(host.Time(detail.Time()))) }) return count } @@ -672,13 +775,6 @@ func (r *Buffer) DataLength() int { return r.dataEvents.Len() } -func (r *Buffer) DetailLength() int { - if r.detailEvents == nil { - return 0 - } - return r.detailEvents.Len() -} - func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int { count := 0 for e := l.Front(); e != nil; {
