This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch missing-details
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit b8addef6e9c255b0cc137a0cef77d66060561c2b
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 12 11:20:53 2024 +0800

    Fix missing details when high traffic
---
 pkg/accesslog/collector/protocols/http1.go         |  91 +++++++++----
 pkg/accesslog/collector/protocols/http2.go         |  53 ++++----
 pkg/accesslog/collector/protocols/protocol.go      |  22 +++-
 pkg/accesslog/collector/protocols/queue.go         |   4 +-
 pkg/accesslog/events/detail.go                     |   4 +
 pkg/profiling/task/network/analyze/events/data.go  |   4 +
 .../analyze/layer7/protocols/base/analyzer.go      |   2 +-
 .../analyze/layer7/protocols/http1/metrics.go      |   4 +-
 .../layer7/protocols/http1/reader/reader.go        |   2 +-
 pkg/tools/buffer/buffer.go                         | 144 +++++++++++++++++----
 10 files changed, 250 insertions(+), 80 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 92a8173..9f4a560 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -19,6 +19,7 @@ package protocols
 
 import (
        "container/list"
+       "fmt"
        "io"
 
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
@@ -33,8 +34,9 @@ import (
 )
 
 var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
+var http1AnalyzeMaxRetryCount = 5
 
-type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, 
response *reader.Response)
+type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, 
response *reader.Response) error
 
 type HTTP1Protocol struct {
        ctx     *common.AccessLogContext
@@ -55,22 +57,31 @@ type HTTP1Metrics struct {
        ConnectionID uint64
        RandomID     uint64
 
-       halfRequests *list.List
+       halfRequests      *list.List
+       analyzeUnFinished *list.List
 }
 
 func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) 
ProtocolMetrics {
        return &HTTP1Metrics{
-               ConnectionID: connectionID,
-               RandomID:     randomID,
-               halfRequests: list.New(),
+               ConnectionID:      connectionID,
+               RandomID:          randomID,
+               halfRequests:      list.New(),
+               analyzeUnFinished: list.New(),
        }
 }
 
+type HTTP1AnalyzeUnFinished struct {
+       request    *reader.Request
+       response   *reader.Response
+       retryCount int
+}
+
 func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ 
*AnalyzeHelper) error {
        metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
        buf := connection.Buffer(enums.ConnectionProtocolHTTP)
        http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: 
%d, random ID: %d, data len: %d",
                metrics.ConnectionID, metrics.RandomID, buf.DataLength())
+       p.handleUnFinishedEvents(metrics)
        buf.ResetForLoopReading()
        for {
                if !buf.PrepareForReading() {
@@ -99,7 +110,7 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = buf.RemoveReadElements()
+                       finishReading = buf.RemoveReadElements(false)
                case enums.ParseResultSkipPackage:
                        finishReading = buf.SkipCurrentElement()
                }
@@ -149,37 +160,70 @@ func (p *HTTP1Protocol) handleResponse(metrics 
*HTTP1Metrics, b *buffer.Buffer)
        }
 
        // getting the request and response, then send to the forwarder
-       p.analyze(metrics, request, response)
+       if analyzeError := p.analyze(metrics, request, response); analyzeError 
!= nil {
+               p.appendAnalyzeUnFinished(metrics, request, response)
+       }
        return enums.ParseResultSuccess, nil
 }
 
-func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
-       detailEvents := make([]events.SocketDetail, 0)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
request.HeaderBuffer())
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
request.BodyBuffer())
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
response.HeaderBuffer())
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
response.BodyBuffer())
-
-       if len(detailEvents) == 0 {
-               http1Log.Warnf("cannot found any detail events for HTTP/1.x 
protocol, connection ID: %d, random ID: %d, data id: %d-%d",
-                       metrics.ConnectionID, metrics.RandomID,
-                       request.MinDataID(), 
response.BodyBuffer().LastSocketBuffer().DataID())
-               return
+func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
+       metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{
+               request:    request,
+               response:   response,
+               retryCount: 0,
+       })
+}
+
+func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
+       for element := m.analyzeUnFinished.Front(); element != nil; {
+               unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
+               err := p.analyze(m, unFinished.request, unFinished.response)
+               if err != nil {
+                       unFinished.retryCount++
+                       if unFinished.retryCount <= http1AnalyzeMaxRetryCount {
+                               element = element.Next()
+                               continue
+                       }
+                       http1Log.Warnf("failed to analyze HTTP1 request and 
response, connection ID: %d, random ID: %d, "+
+                               "error: %v", m.ConnectionID, m.RandomID, err)
+               }
+               next := element.Next()
+               m.analyzeUnFinished.Remove(element)
+               element = next
+       }
+}
+
+func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) error {
+       details := make([]events.SocketDetail, 0)
+       var allInclude = true
+       var idRange *buffer.DataIDRange
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
request.HeaderBuffer(), idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
request.BodyBuffer(), idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
response.HeaderBuffer(), idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
response.BodyBuffer(), idRange, allInclude)
+
+       if !allInclude {
+               return fmt.Errorf("cannot found full detail events for HTTP/1.x 
protocol, "+
+                       "data id: %d-%d, current details count: %d",
+                       request.MinDataID(), 
response.BodyBuffer().LastSocketBuffer().DataID(), len(details))
        }
+
        http1Log.Debugf("found fully HTTP1 request and response, contains %d 
detail events , connection ID: %d, random ID: %d",
-               len(detailEvents), metrics.ConnectionID, metrics.RandomID)
+               len(details), metrics.ConnectionID, metrics.RandomID)
        originalRequest := request.Original()
        originalResponse := response.Original()
+       // delete details(each request or response is fine because it's will 
delete the original buffer)
+       idRange.DeleteDetails(request.HeaderBuffer())
 
        defer func() {
                p.CloseStream(originalRequest.Body)
                p.CloseStream(originalResponse.Body)
        }()
-       forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
+       forwarder.SendTransferProtocolEvent(p.ctx, details, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
-                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
+                               EndTime:   
forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP1,
                                Request: &v3.AccessLogHTTPProtocolRequest{
                                        Method:             
TransformHTTPMethod(originalRequest.Method),
@@ -198,6 +242,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics 
*HTTP1Metrics, request *reader.Re
                        },
                },
        })
+       return nil
 }
 
 func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index b18226a..266413a 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -19,6 +19,7 @@ package protocols
 
 import (
        "errors"
+       "fmt"
        "strconv"
        "strings"
        "time"
@@ -42,7 +43,7 @@ var maxHTTP2StreamingTime = time.Minute * 3
 
 var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
 
-type HTTP2StreamAnalyze func(stream *HTTP2Streaming)
+type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
 
 type HTTP2Protocol struct {
        ctx     *common.AccessLogContext
@@ -140,7 +141,7 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = buf.RemoveReadElements()
+                       finishReading = buf.RemoveReadElements(false)
                case enums.ParseResultSkipPackage:
                        finishReading = buf.SkipCurrentElement()
                }
@@ -194,12 +195,12 @@ func (r *HTTP2Protocol) handleHeader(header 
*http2.FrameHeader, startPos *buffer
 
        if !streaming.IsInResponse {
                r.AppendHeaders(streaming.ReqHeader, headers)
-               streaming.ReqHeaderBuffer = buffer.CombineSlices(true, 
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.ReqHeaderBuffer = buffer.CombineSlices(true, buf, 
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
                return enums.ParseResultSuccess, false, nil
        }
 
        r.AppendHeaders(streaming.RespHeader, headers)
-       streaming.RespHeaderBuffer = buffer.CombineSlices(true, 
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+       streaming.RespHeaderBuffer = buffer.CombineSlices(true, buf, 
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
 
        // is end of stream and in the response
        if header.Flags.Has(http2.FlagHeadersEndStream) {
@@ -235,24 +236,27 @@ func (r *HTTP2Protocol) 
validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
        }
 }
 
-func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
-       detailEvents := make([]events.SocketDetail, 0)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.ReqHeaderBuffer)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.ReqBodyBuffer)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.RespHeaderBuffer)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.RespBodyBuffer)
-
-       if len(detailEvents) == 0 {
-               http2Log.Warnf("cannot found any detail events for HTTP/2 
protocol, data id: %d-%d",
-                       stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), 
stream.RespBodyBuffer.LastSocketBuffer().DataID())
-               return
+func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
+       details := make([]events.SocketDetail, 0)
+       var allInclude = true
+       var idRange *buffer.DataIDRange
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.ReqHeaderBuffer, idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.ReqBodyBuffer, idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.RespHeaderBuffer, idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.RespBodyBuffer, idRange, allInclude)
+
+       if !allInclude {
+               return fmt.Errorf("cannot found any detail events for HTTP/2 
protocol, data id: %d-%d, current details count: %d",
+                       stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), 
stream.RespBodyBuffer.LastSocketBuffer().DataID(),
+                       len(details))
        }
+       idRange.DeleteDetails(stream.ReqHeaderBuffer)
 
-       forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
+       forwarder.SendTransferProtocolEvent(r.ctx, details, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, 
detailEvents[0]).GetStartTime()),
-                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, 
details[0]).GetStartTime()),
+                               EndTime:   
forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP2,
                                Request: &v3.AccessLogHTTPProtocolRequest{
                                        Method:             
r.ParseHTTPMethod(stream),
@@ -272,6 +276,7 @@ func (r *HTTP2Protocol) handleWholeStream(stream 
*HTTP2Streaming) {
                        },
                },
        })
+       return nil
 }
 
 func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) 
v3.AccessLogHTTPProtocolRequestMethod {
@@ -284,10 +289,14 @@ func (r *HTTP2Protocol) ParseHTTPMethod(streaming 
*HTTP2Streaming) v3.AccessLogH
 }
 
 func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def 
events.SocketDetail) events.SocketDetail {
-       if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
+       if buf == nil {
+               return def
+       }
+       details := buf.BuildDetails()
+       if details == nil || details.Len() == 0 {
                return def
        }
-       return buf.Details().Front().Value.(events.SocketDetail)
+       return details.Front().Value.(events.SocketDetail)
 }
 
 func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 {
@@ -315,9 +324,9 @@ func (r *HTTP2Protocol) handleData(header 
*http2.FrameHeader, startPos *buffer.P
                return enums.ParseResultSkipPackage, false, err
        }
        if !streaming.IsInResponse {
-               streaming.ReqBodyBuffer = buffer.CombineSlices(true, 
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.ReqBodyBuffer = buffer.CombineSlices(true, buf, 
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        } else {
-               streaming.RespBodyBuffer = buffer.CombineSlices(true, 
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, 
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        }
 
        r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
diff --git a/pkg/accesslog/collector/protocols/protocol.go 
b/pkg/accesslog/collector/protocols/protocol.go
index 41ac8a2..862e80f 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -56,17 +56,29 @@ type Protocol interface {
        Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error
 }
 
-func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer) []events.SocketDetail {
-       if buf == nil || buf.DetailLength() == 0 {
-               return result
+func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer, dataIdRange *buffer.DataIDRange,
+       allDetailInclude bool) ([]events.SocketDetail, *buffer.DataIDRange, 
bool) {
+       if buf == nil || !allDetailInclude {
+               return result, dataIdRange, false
        }
-       for e := buf.Details().Front(); e != nil; e = e.Next() {
+       details := buf.BuildDetails()
+       if details == nil || details.Len() == 0 {
+               return result, dataIdRange, false
+       }
+       currentDataIdRange := buf.BuildTotalDataIDRange()
+       if !currentDataIdRange.IsIncludeAllDetails(details) {
+               return result, dataIdRange, false
+       }
+       for e := details.Front(); e != nil; e = e.Next() {
                if len(result) > 0 && result[len(result)-1] == e.Value {
                        continue
                }
                result = append(result, e.Value.(events.SocketDetail))
        }
-       return result
+       if dataIdRange == nil {
+               return result, currentDataIdRange, true
+       }
+       return result, dataIdRange.Append(currentDataIdRange), true
 }
 
 func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog 
*logger.Logger) *v3.AccessLogTraceInfo {
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 68708fc..833d323 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -215,9 +215,9 @@ func (p *PartitionContext) Consume(data interface{}) {
        case events.SocketDetail:
                pid, _ := events.ParseConnectionID(event.GetConnectionID())
                log.Debugf("receive the socket detail event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, "+
-                       "function name: %s, package count: %d, package size: 
%d, ssl: %d",
+                       "function name: %s, package count: %d, package size: 
%d, ssl: %d, protocol: %d",
                        event.GetConnectionID(), event.GetRandomID(), pid, 
event.DataID(), event.GetFunctionName(),
-                       event.GetL4PackageCount(), 
event.GetL4TotalPackageSize(), event.GetSSL())
+                       event.GetL4PackageCount(), 
event.GetL4TotalPackageSize(), event.GetSSL(), event.GetProtocol())
                if event.GetProtocol() == enums.ConnectionProtocolUnknown {
                        // if the connection protocol is unknown, we just needs 
to add this into the kernel log
                        forwarder.SendTransferNoProtocolEvent(p.context, event)
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index 3917931..a5ac46c 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -103,6 +103,10 @@ func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
        d.SSL = r.ReadUint8()
 }
 
+func (d *SocketDetailEvent) Time() uint64 {
+       return d.StartTime
+}
+
 func (d *SocketDetailEvent) GetConnectionID() uint64 {
        return d.ConnectionID
 }
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index 804e96b..1f71fa8 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -108,6 +108,10 @@ type SocketDetailEvent struct {
        RTTTime          uint32
 }
 
+func (s *SocketDetailEvent) Time() uint64 {
+       return 0
+}
+
 func (s *SocketDetailEvent) DataID() uint64 {
        return s.DataID0
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 52b181d..1acc342 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -185,7 +185,7 @@ func (a *ProtocolAnalyzer) 
processConnectionEvents(connection *connectionInfo) {
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = connection.buffer.RemoveReadElements()
+                       finishReading = 
connection.buffer.RemoveReadElements(true)
                case enums.ParseResultSkipPackage:
                        finishReading = connection.buffer.SkipCurrentElement()
                }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 4c73cf0..d041997 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -286,8 +286,8 @@ func (h *Trace) appendHTTPEvent(attaches 
[]*v3.SpanAttachedEvent, process api.Pr
 
 func (h *Trace) appendSyscallEvents(attachEvents []*v3.SpanAttachedEvent, 
process api.ProcessInterface, traffic *base.ProcessTraffic,
        message *reader.MessageOpt) []*v3.SpanAttachedEvent {
-       headerDetails := message.HeaderBuffer().Details()
-       bodyDetails := message.BodyBuffer().Details()
+       headerDetails := message.HeaderBuffer().BuildDetails()
+       bodyDetails := message.BodyBuffer().BuildDetails()
        dataIDCache := make(map[uint64]bool)
        for e := headerDetails.Front(); e != nil; e = e.Next() {
                event := e.Value.(*events.SocketDetailEvent)
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index e4aac3f..8d299fd 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -237,7 +237,7 @@ func (m *MessageOpt) checkChunkedBody(buf *buffer.Buffer, 
bodyReader *bufio.Read
                        return nil, enums.ParseResultSkipPackage, 
fmt.Errorf("the chunk data parding error, should be empty: %s", d)
                }
        }
-       return buffer.CombineSlices(true, buffers...), 
enums.ParseResultSuccess, nil
+       return buffer.CombineSlices(true, buf, buffers...), 
enums.ParseResultSuccess, nil
 }
 
 func (m *MessageOpt) checkBodyWithSize(buf *buffer.Buffer, reader 
*bufio.Reader, size int,
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 0bfc5d9..5cd9f16 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -30,6 +30,7 @@ import (
 
 var (
        ErrNotComplete = errors.New("socket: not complete event")
+       emptyList      = list.New()
 )
 
 type SocketDataBuffer interface {
@@ -67,6 +68,14 @@ type SocketDataBuffer interface {
 type SocketDataDetail interface {
        // DataID data id of the buffer
        DataID() uint64
+       // Time (BPF) of the detail event
+       Time() uint64
+}
+
+type DataIDRange struct {
+       From                   uint64
+       To                     uint64
+       IsToBufferReadFinished bool
 }
 
 type Buffer struct {
@@ -82,6 +91,13 @@ type Buffer struct {
        // record the latest expired data id in connection for expire the older 
socket detail
        // because the older socket detail may not be received in buffer
        latestExpiredDataID uint64
+
+       // originalBuffer record this buffer is from Buffer.Slice or 
CombineSlices
+       // if it's not empty, then when getting the details should query from 
originalBuffer
+       // Because the BPF Queue is unsorted and can be delayed, so the details 
should query by real buffer
+       originalBuffer *Buffer
+       // endPosition record the end position of the originalBuffer
+       endPosition *Position
 }
 
 type SocketDataEventLimited struct {
@@ -106,6 +122,46 @@ func (s *SocketDataEventLimited) BufferStartPosition() int 
{
        return s.From
 }
 
+func (i *DataIDRange) IsIncludeAllDetails(list *list.List) bool {
+       if list.Len() == 0 {
+               return false
+       }
+       for e := list.Front(); e != nil; e = e.Next() {
+               if e.Value.(SocketDataDetail).DataID() < i.From || 
e.Value.(SocketDataDetail).DataID() > i.To {
+                       return false
+               }
+       }
+       return true
+}
+
+func (i *DataIDRange) Append(other *DataIDRange) *DataIDRange {
+       if other.From < i.From {
+               i.From = other.From
+       }
+       if other.To > i.To {
+               i.To = other.To
+       }
+       i.IsToBufferReadFinished = other.IsToBufferReadFinished
+       return i
+}
+
+func (i *DataIDRange) DeleteDetails(buf *Buffer) {
+       if buf.originalBuffer != nil {
+               i.DeleteDetails(buf.originalBuffer)
+       }
+       for e := buf.detailEvents.Front(); e != nil; {
+               next := e.Next()
+               dataId := e.Value.(SocketDataDetail).DataID()
+               if dataId >= i.From && dataId <= i.To {
+                       if !i.IsToBufferReadFinished && dataId == i.To {
+                               break
+                       }
+                       buf.detailEvents.Remove(e)
+               }
+               e = next
+       }
+}
+
 type Position struct {
        // element of the event list
        element *list.Element
@@ -143,6 +199,26 @@ func (r *Buffer) FindFirstDataBuffer(dataID uint64) 
SocketDataBuffer {
        return nil
 }
 
+func (r *Buffer) BuildTotalDataIDRange() *DataIDRange {
+       if r.dataEvents.Len() == 0 {
+               return nil
+       }
+       var toIndex uint64
+       var isToBufferReadFinished bool
+       if r.endPosition != nil {
+               toIndex = r.endPosition.DataID()
+               isToBufferReadFinished = r.endPosition.bufIndex == 
r.endPosition.element.Value.(SocketDataBuffer).BufferLen()
+       } else {
+               toIndex = r.current.DataID()
+               isToBufferReadFinished = r.current.bufIndex == 
r.current.element.Value.(SocketDataBuffer).BufferLen()
+       }
+       return &DataIDRange{
+               From:                   r.head.DataID(),
+               To:                     toIndex,
+               IsToBufferReadFinished: isToBufferReadFinished,
+       }
+}
+
 func (r *Buffer) Position() *Position {
        return r.current.Clone()
 }
@@ -155,6 +231,7 @@ func (r *Buffer) Clean() {
        r.detailEvents = list.New()
        r.head = nil
        r.current = nil
+       r.endPosition = nil
 }
 
 func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
@@ -198,11 +275,13 @@ func (r *Buffer) Slice(validated bool, start, end 
*Position) *Buffer {
        }
 
        return &Buffer{
-               dataEvents:   dataEvents,
-               detailEvents: detailEvents,
-               validated:    validated,
-               head:         &Position{element: dataEvents.Front(), bufIndex: 
start.bufIndex},
-               current:      &Position{element: dataEvents.Front(), bufIndex: 
start.bufIndex},
+               dataEvents:     dataEvents,
+               detailEvents:   detailEvents,
+               validated:      validated,
+               head:           &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
+               current:        &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
+               originalBuffer: r,
+               endPosition:    end,
        }
 }
 
@@ -219,7 +298,25 @@ func (r *Buffer) Len() int {
        return result
 }
 
-func (r *Buffer) Details() *list.List {
+func (r *Buffer) BuildDetails() *list.List {
+       // if the original buffer is not empty, then query the details from 
original buffer
+       if r.originalBuffer != nil {
+               events := list.New()
+               fromDataId := r.head.DataID()
+               var endDataId uint64
+               if r.endPosition != nil {
+                       endDataId = r.endPosition.DataID()
+               } else {
+                       endDataId = r.current.DataID()
+               }
+
+               for e := r.originalBuffer.detailEvents.Front(); e != nil; e = 
e.Next() {
+                       if e.Value.(SocketDataDetail).DataID() >= fromDataId && 
e.Value.(SocketDataDetail).DataID() <= endDataId {
+                               events.PushBack(e.Value)
+                       }
+               }
+               return events
+       }
        return r.detailEvents
 }
 
@@ -281,7 +378,7 @@ func (r *Buffer) DetectNotSendingLastPosition() *Position {
        return nil
 }
 
-func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
+func CombineSlices(validated bool, originalBuffer *Buffer, buffers ...*Buffer) 
*Buffer {
        if len(buffers) == 0 {
                return nil
        }
@@ -289,7 +386,6 @@ func CombineSlices(validated bool, buffers ...*Buffer) 
*Buffer {
                return buffers[0]
        }
        dataEvents := list.New()
-       detailEvents := list.New()
        for _, b := range buffers {
                if b == nil || b.head == nil {
                        continue
@@ -304,15 +400,20 @@ func CombineSlices(validated bool, buffers ...*Buffer) 
*Buffer {
                } else {
                        dataEvents.PushBackList(b.dataEvents)
                }
-               detailEvents.PushBackList(b.detailEvents)
        }
 
+       var endPosition = buffers[len(buffers)-1].endPosition
+       if endPosition == nil {
+               endPosition = buffers[len(buffers)-1].Position()
+       }
        return &Buffer{
-               dataEvents:   dataEvents,
-               detailEvents: detailEvents,
-               validated:    validated,
-               head:         &Position{element: dataEvents.Front(), bufIndex: 
0},
-               current:      &Position{element: dataEvents.Front(), bufIndex: 
0},
+               dataEvents:     dataEvents,
+               detailEvents:   emptyList, // for the combined buffer, the 
details list should be queried from original buffer
+               validated:      validated,
+               head:           &Position{element: dataEvents.Front(), 
bufIndex: 0},
+               current:        &Position{element: dataEvents.Front(), 
bufIndex: 0},
+               originalBuffer: originalBuffer,
+               endPosition:    endPosition,
        }
 }
 
@@ -505,12 +606,12 @@ func (r *Buffer) PrepareForReading() bool {
 }
 
 // nolint
-func (r *Buffer) RemoveReadElements() bool {
+func (r *Buffer) RemoveReadElements(includeDetails bool) bool {
        r.eventLocker.Lock()
        defer r.eventLocker.Unlock()
 
        // delete until the last data id
-       if r.head.element != nil && r.current.element != nil {
+       if includeDetails && r.head.element != nil && r.current.element != nil {
                firstDataID := r.head.element.Value.(SocketDataBuffer).DataID()
                currentBuffer := r.current.element.Value.(SocketDataBuffer)
                lastDataID := currentBuffer.DataID()
@@ -660,7 +761,9 @@ func (r *Buffer) DeleteExpireEvents(expireDuration 
time.Duration) int {
 
        // detail event queue
        count += r.deleteEventsWithJudgement(r.detailEvents, func(element 
*list.Element) bool {
-               return r.latestExpiredDataID > 0 && 
element.Value.(SocketDataDetail).DataID() <= r.latestExpiredDataID
+               detail := element.Value.(SocketDataDetail)
+               return r.latestExpiredDataID > 0 && detail.DataID() <= 
r.latestExpiredDataID ||
+                       (detail.Time() > 0 && 
expireTime.After(host.Time(detail.Time())))
        })
        return count
 }
@@ -672,13 +775,6 @@ func (r *Buffer) DataLength() int {
        return r.dataEvents.Len()
 }
 
-func (r *Buffer) DetailLength() int {
-       if r.detailEvents == nil {
-               return 0
-       }
-       return r.detailEvents.Len()
-}
-
 func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element 
*list.Element) bool) int {
        count := 0
        for e := l.Front(); e != nil; {

Reply via email to