This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch missing-details
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit fa318553a53c0ff34bf703aae34e799a3451ba77
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 12 11:20:53 2024 +0800

    Reduce missing details issue in the access log module
---
 CHANGES.md                                         |   1 +
 pkg/accesslog/collector/connection.go              |  11 +-
 pkg/accesslog/collector/protocols/http1.go         | 118 ++++++++++----
 pkg/accesslog/collector/protocols/http2.go         |  53 ++++---
 pkg/accesslog/collector/protocols/protocol.go      |  22 ++-
 pkg/accesslog/collector/protocols/queue.go         |   4 +-
 pkg/accesslog/common/config.go                     |   3 +-
 pkg/accesslog/events/detail.go                     |   4 +
 pkg/profiling/task/network/analyze/events/data.go  |   4 +
 .../analyze/layer7/protocols/base/analyzer.go      |   2 +-
 .../analyze/layer7/protocols/http1/analyzer.go     |  12 +-
 .../analyze/layer7/protocols/http1/metrics.go      |   4 +-
 .../layer7/protocols/http1/reader/reader.go        |  34 ++--
 .../layer7/protocols/http1/reader/request.go       |   9 +-
 .../layer7/protocols/http1/reader/response.go      |   9 +-
 pkg/tools/buffer/buffer.go                         | 171 ++++++++++++++++++---
 16 files changed, 349 insertions(+), 112 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3392d50..219e53e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 * Support for connecting to the backend server over TLS without requiring 
`ca.pem`.
 * Fix missing the first socket detail event in HTTPS protocol.
 * Support parallel parsing protocol data in the access log module.
+* Reduce missing details issue in the access log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index 8e8aba1..63b55b8 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -64,7 +64,10 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        if int(perCPUBufferSize) < os.Getpagesize() {
                return fmt.Errorf("the cpu buffer must bigger than %dB", 
os.Getpagesize())
        }
-       if ctx.Config.ConnectionAnalyze.Parallels < 1 {
+       if ctx.Config.ConnectionAnalyze.ParseParallels < 1 {
+               return fmt.Errorf("the parallels cannot be small than 1")
+       }
+       if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 {
                return fmt.Errorf("the parallels cannot be small than 1")
        }
        if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
@@ -74,15 +77,15 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        if err != nil {
                connectionLogger.Warnf("cannot create the connection tracker, 
%v", err)
        }
-       c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, 
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
+       c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels, 
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
                return newConnectionPartitionContext(ctx, track)
        })
-       c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize), 1, func() interface{} {
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() 
interface{} {
                return &events.SocketConnectEvent{}
        }, func(data interface{}) string {
                return fmt.Sprintf("%d", 
data.(*events.SocketConnectEvent).ConID)
        })
-       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), 1, func() interface{} {
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() 
interface{} {
                return &events.SocketCloseEvent{}
        }, func(data interface{}) string {
                return fmt.Sprintf("%d", 
data.(*events.SocketCloseEvent).ConnectionID)
diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 92a8173..9c17160 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -19,6 +19,7 @@ package protocols
 
 import (
        "container/list"
+       "fmt"
        "io"
 
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
@@ -33,16 +34,18 @@ import (
 )
 
 var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
+var http1AnalyzeMaxRetryCount = 3
 
-type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, 
response *reader.Response)
+type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, 
response *reader.Response) error
 
 type HTTP1Protocol struct {
        ctx     *common.AccessLogContext
        analyze HTTP1ProtocolAnalyze
+       reader  *reader.Reader
 }
 
 func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze 
HTTP1ProtocolAnalyze) *HTTP1Protocol {
-       protocol := &HTTP1Protocol{ctx: ctx}
+       protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
        if analyze == nil {
                protocol.analyze = protocol.HandleHTTPData
        } else {
@@ -55,29 +58,41 @@ type HTTP1Metrics struct {
        ConnectionID uint64
        RandomID     uint64
 
-       halfRequests *list.List
+       halfRequests      *list.List
+       analyzeUnFinished *list.List
 }
 
 func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) 
ProtocolMetrics {
        return &HTTP1Metrics{
-               ConnectionID: connectionID,
-               RandomID:     randomID,
-               halfRequests: list.New(),
+               ConnectionID:      connectionID,
+               RandomID:          randomID,
+               halfRequests:      list.New(),
+               analyzeUnFinished: list.New(),
        }
 }
 
+type HTTP1AnalyzeUnFinished struct {
+       request    *reader.Request
+       response   *reader.Response
+       retryCount int
+}
+
 func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ 
*AnalyzeHelper) error {
        metrics := 
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
        buf := connection.Buffer(enums.ConnectionProtocolHTTP)
        http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: 
%d, random ID: %d, data len: %d",
                metrics.ConnectionID, metrics.RandomID, buf.DataLength())
+       p.handleUnFinishedEvents(metrics)
        buf.ResetForLoopReading()
        for {
                if !buf.PrepareForReading() {
                        return nil
                }
 
-               messageType, err := reader.IdentityMessageType(buf)
+               messageType, err := p.reader.IdentityMessageType(buf)
+               log.Debugf("ready to reading message type, messageType: %v, 
buf: %p, data id: %d, "+
+                       "connection ID: %d, random ID: %d, error: %v", 
messageType, buf, buf.Position().DataID(),
+                       metrics.ConnectionID, metrics.RandomID, err)
                if err != nil {
                        http1Log.Debugf("failed to identity message type, %v", 
err)
                        if buf.SkipCurrentElement() {
@@ -89,19 +104,25 @@ func (p *HTTP1Protocol) Analyze(connection 
*PartitionConnection, _ *AnalyzeHelpe
                var result enums.ParseResult
                switch messageType {
                case reader.MessageTypeRequest:
-                       result, _ = p.handleRequest(metrics, buf)
+                       result, err = p.handleRequest(metrics, buf)
                case reader.MessageTypeResponse:
-                       result, _ = p.handleResponse(metrics, buf)
+                       result, err = p.handleResponse(metrics, buf)
                case reader.MessageTypeUnknown:
                        result = enums.ParseResultSkipPackage
                }
+               if err != nil {
+                       http1Log.Warnf("failed to handle HTTP/1.x protocol, 
connection ID: %d, random ID: %d, data id: %d, error: %v",
+                               metrics.ConnectionID, metrics.RandomID, 
buf.Position().DataID(), err)
+               }
 
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = buf.RemoveReadElements()
+                       finishReading = buf.RemoveReadElements(false)
                case enums.ParseResultSkipPackage:
                        finishReading = buf.SkipCurrentElement()
+                       log.Debugf("skip current element, data id: %d, buf: %p, 
connection ID: %d, random ID: %d",
+                               buf.Position().DataID(), buf, 
metrics.ConnectionID, metrics.RandomID)
                }
 
                if finishReading {
@@ -116,7 +137,7 @@ func (p *HTTP1Protocol) ForProtocol() 
enums.ConnectionProtocol {
 }
 
 func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf 
*buffer.Buffer) (enums.ParseResult, error) {
-       req, result, err := reader.ReadRequest(buf, true)
+       req, result, err := p.reader.ReadRequest(buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
        }
@@ -130,12 +151,14 @@ func (p *HTTP1Protocol) handleRequest(metrics 
*HTTP1Metrics, buf *buffer.Buffer)
 func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b 
*buffer.Buffer) (enums.ParseResult, error) {
        firstRequest := metrics.halfRequests.Front()
        if firstRequest == nil {
+               log.Debugf("cannot found request for response, skip response, 
connection ID: %d, random ID: %d",
+                       metrics.ConnectionID, metrics.RandomID)
                return enums.ParseResultSkipPackage, nil
        }
        request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
 
        // parsing response
-       response, result, err := reader.ReadResponse(request, b, true)
+       response, result, err := p.reader.ReadResponse(request, b, true)
        defer func() {
                // if parsing response failed, then put the request back to the 
list
                if result != enums.ParseResultSuccess {
@@ -149,37 +172,71 @@ func (p *HTTP1Protocol) handleResponse(metrics 
*HTTP1Metrics, b *buffer.Buffer)
        }
 
        // getting the request and response, then send to the forwarder
-       p.analyze(metrics, request, response)
+       if analyzeError := p.analyze(metrics, request, response); analyzeError 
!= nil {
+               p.appendAnalyzeUnFinished(metrics, request, response)
+       }
        return enums.ParseResultSuccess, nil
 }
 
-func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
-       detailEvents := make([]events.SocketDetail, 0)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
request.HeaderBuffer())
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
request.BodyBuffer())
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
response.HeaderBuffer())
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
response.BodyBuffer())
-
-       if len(detailEvents) == 0 {
-               http1Log.Warnf("cannot found any detail events for HTTP/1.x 
protocol, connection ID: %d, random ID: %d, data id: %d-%d",
-                       metrics.ConnectionID, metrics.RandomID,
-                       request.MinDataID(), 
response.BodyBuffer().LastSocketBuffer().DataID())
-               return
+func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
+       metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{
+               request:    request,
+               response:   response,
+               retryCount: 0,
+       })
+}
+
+func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
+       for element := m.analyzeUnFinished.Front(); element != nil; {
+               unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
+               err := p.analyze(m, unFinished.request, unFinished.response)
+               if err != nil {
+                       unFinished.retryCount++
+                       if unFinished.retryCount < http1AnalyzeMaxRetryCount {
+                               element = element.Next()
+                               continue
+                       }
+                       http1Log.Warnf("failed to analyze HTTP1 request and 
response, connection ID: %d, random ID: %d, "+
+                               "retry count: %d, error: %v", m.ConnectionID, 
m.RandomID, unFinished.retryCount, err)
+               }
+               next := element.Next()
+               m.analyzeUnFinished.Remove(element)
+               element = next
        }
-       http1Log.Debugf("found fully HTTP1 request and response, contains %d 
detail events , connection ID: %d, random ID: %d",
-               len(detailEvents), metrics.ConnectionID, metrics.RandomID)
+}
+
+func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) error {
+       details := make([]events.SocketDetail, 0)
+       var allInclude = true
+       var idRange *buffer.DataIDRange
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
request.HeaderBuffer(), idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
request.BodyBuffer(), idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
response.HeaderBuffer(), idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
response.BodyBuffer(), idRange, allInclude)
+
+       if !allInclude {
+               return fmt.Errorf("cannot found full detail events for HTTP/1.x 
protocol, "+
+                       "data id: %d-%d, current details count: %d",
+                       request.MinDataID(), 
response.BodyBuffer().LastSocketBuffer().DataID(), len(details))
+       }
+
+       http1Log.Debugf("found fully HTTP1 request and response, contains %d 
detail events, "+
+               "connection ID: %d, random ID: %d, data range: %d-%d(%t)",
+               len(details), metrics.ConnectionID, metrics.RandomID, 
idRange.From, idRange.To, idRange.IsToBufferReadFinished)
        originalRequest := request.Original()
        originalResponse := response.Original()
+       // delete details(each request or response is fine because it's will 
delete the original buffer)
+       idRange.DeleteDetails(request.HeaderBuffer())
 
        defer func() {
                p.CloseStream(originalRequest.Body)
                p.CloseStream(originalResponse.Body)
        }()
-       forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
+       forwarder.SendTransferProtocolEvent(p.ctx, details, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
-                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
+                               EndTime:   
forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP1,
                                Request: &v3.AccessLogHTTPProtocolRequest{
                                        Method:             
TransformHTTPMethod(originalRequest.Method),
@@ -198,6 +255,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics 
*HTTP1Metrics, request *reader.Re
                        },
                },
        })
+       return nil
 }
 
 func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index b18226a..266413a 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -19,6 +19,7 @@ package protocols
 
 import (
        "errors"
+       "fmt"
        "strconv"
        "strings"
        "time"
@@ -42,7 +43,7 @@ var maxHTTP2StreamingTime = time.Minute * 3
 
 var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
 
-type HTTP2StreamAnalyze func(stream *HTTP2Streaming)
+type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
 
 type HTTP2Protocol struct {
        ctx     *common.AccessLogContext
@@ -140,7 +141,7 @@ func (r *HTTP2Protocol) Analyze(connection 
*PartitionConnection, helper *Analyze
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = buf.RemoveReadElements()
+                       finishReading = buf.RemoveReadElements(false)
                case enums.ParseResultSkipPackage:
                        finishReading = buf.SkipCurrentElement()
                }
@@ -194,12 +195,12 @@ func (r *HTTP2Protocol) handleHeader(header 
*http2.FrameHeader, startPos *buffer
 
        if !streaming.IsInResponse {
                r.AppendHeaders(streaming.ReqHeader, headers)
-               streaming.ReqHeaderBuffer = buffer.CombineSlices(true, 
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.ReqHeaderBuffer = buffer.CombineSlices(true, buf, 
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
                return enums.ParseResultSuccess, false, nil
        }
 
        r.AppendHeaders(streaming.RespHeader, headers)
-       streaming.RespHeaderBuffer = buffer.CombineSlices(true, 
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+       streaming.RespHeaderBuffer = buffer.CombineSlices(true, buf, 
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
 
        // is end of stream and in the response
        if header.Flags.Has(http2.FlagHeadersEndStream) {
@@ -235,24 +236,27 @@ func (r *HTTP2Protocol) 
validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
        }
 }
 
-func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
-       detailEvents := make([]events.SocketDetail, 0)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.ReqHeaderBuffer)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.ReqBodyBuffer)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.RespHeaderBuffer)
-       detailEvents = AppendSocketDetailsFromBuffer(detailEvents, 
stream.RespBodyBuffer)
-
-       if len(detailEvents) == 0 {
-               http2Log.Warnf("cannot found any detail events for HTTP/2 
protocol, data id: %d-%d",
-                       stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), 
stream.RespBodyBuffer.LastSocketBuffer().DataID())
-               return
+func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
+       details := make([]events.SocketDetail, 0)
+       var allInclude = true
+       var idRange *buffer.DataIDRange
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.ReqHeaderBuffer, idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.ReqBodyBuffer, idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.RespHeaderBuffer, idRange, allInclude)
+       details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, 
stream.RespBodyBuffer, idRange, allInclude)
+
+       if !allInclude {
+               return fmt.Errorf("cannot found any detail events for HTTP/2 
protocol, data id: %d-%d, current details count: %d",
+                       stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), 
stream.RespBodyBuffer.LastSocketBuffer().DataID(),
+                       len(details))
        }
+       idRange.DeleteDetails(stream.ReqHeaderBuffer)
 
-       forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
+       forwarder.SendTransferProtocolEvent(r.ctx, details, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, 
detailEvents[0]).GetStartTime()),
-                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, 
details[0]).GetStartTime()),
+                               EndTime:   
forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP2,
                                Request: &v3.AccessLogHTTPProtocolRequest{
                                        Method:             
r.ParseHTTPMethod(stream),
@@ -272,6 +276,7 @@ func (r *HTTP2Protocol) handleWholeStream(stream 
*HTTP2Streaming) {
                        },
                },
        })
+       return nil
 }
 
 func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) 
v3.AccessLogHTTPProtocolRequestMethod {
@@ -284,10 +289,14 @@ func (r *HTTP2Protocol) ParseHTTPMethod(streaming 
*HTTP2Streaming) v3.AccessLogH
 }
 
 func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def 
events.SocketDetail) events.SocketDetail {
-       if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
+       if buf == nil {
+               return def
+       }
+       details := buf.BuildDetails()
+       if details == nil || details.Len() == 0 {
                return def
        }
-       return buf.Details().Front().Value.(events.SocketDetail)
+       return details.Front().Value.(events.SocketDetail)
 }
 
 func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 {
@@ -315,9 +324,9 @@ func (r *HTTP2Protocol) handleData(header 
*http2.FrameHeader, startPos *buffer.P
                return enums.ParseResultSkipPackage, false, err
        }
        if !streaming.IsInResponse {
-               streaming.ReqBodyBuffer = buffer.CombineSlices(true, 
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.ReqBodyBuffer = buffer.CombineSlices(true, buf, 
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        } else {
-               streaming.RespBodyBuffer = buffer.CombineSlices(true, 
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+               streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, 
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
        }
 
        r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
diff --git a/pkg/accesslog/collector/protocols/protocol.go 
b/pkg/accesslog/collector/protocols/protocol.go
index 41ac8a2..862e80f 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -56,17 +56,29 @@ type Protocol interface {
        Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error
 }
 
-func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer) []events.SocketDetail {
-       if buf == nil || buf.DetailLength() == 0 {
-               return result
+func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer, dataIdRange *buffer.DataIDRange,
+       allDetailInclude bool) ([]events.SocketDetail, *buffer.DataIDRange, 
bool) {
+       if buf == nil || !allDetailInclude {
+               return result, dataIdRange, false
        }
-       for e := buf.Details().Front(); e != nil; e = e.Next() {
+       details := buf.BuildDetails()
+       if details == nil || details.Len() == 0 {
+               return result, dataIdRange, false
+       }
+       currentDataIdRange := buf.BuildTotalDataIDRange()
+       if !currentDataIdRange.IsIncludeAllDetails(details) {
+               return result, dataIdRange, false
+       }
+       for e := details.Front(); e != nil; e = e.Next() {
                if len(result) > 0 && result[len(result)-1] == e.Value {
                        continue
                }
                result = append(result, e.Value.(events.SocketDetail))
        }
-       return result
+       if dataIdRange == nil {
+               return result, currentDataIdRange, true
+       }
+       return result, dataIdRange.Append(currentDataIdRange), true
 }
 
 func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog 
*logger.Logger) *v3.AccessLogTraceInfo {
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 68708fc..833d323 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -215,9 +215,9 @@ func (p *PartitionContext) Consume(data interface{}) {
        case events.SocketDetail:
                pid, _ := events.ParseConnectionID(event.GetConnectionID())
                log.Debugf("receive the socket detail event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, "+
-                       "function name: %s, package count: %d, package size: 
%d, ssl: %d",
+                       "function name: %s, package count: %d, package size: 
%d, ssl: %d, protocol: %d",
                        event.GetConnectionID(), event.GetRandomID(), pid, 
event.DataID(), event.GetFunctionName(),
-                       event.GetL4PackageCount(), 
event.GetL4TotalPackageSize(), event.GetSSL())
+                       event.GetL4PackageCount(), 
event.GetL4TotalPackageSize(), event.GetSSL(), event.GetProtocol())
                if event.GetProtocol() == enums.ConnectionProtocolUnknown {
                        // if the connection protocol is unknown, we just needs 
to add this into the kernel log
                        forwarder.SendTransferNoProtocolEvent(p.context, event)
diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go
index 1ef0aeb..0298088 100644
--- a/pkg/accesslog/common/config.go
+++ b/pkg/accesslog/common/config.go
@@ -37,7 +37,8 @@ type FlushConfig struct {
 
 type ConnectionAnalyzeConfig struct {
        PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
-       Parallels        int    `mapstructure:"parallels"`
+       ParseParallels   int    `mapstructure:"parse_parallels"`
+       AnalyzeParallels int    `mapstructure:"analyze_parallels"`
        QueueSize        int    `mapstructure:"queue_size"`
 }
 
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index 3917931..a5ac46c 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -103,6 +103,10 @@ func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
        d.SSL = r.ReadUint8()
 }
 
+func (d *SocketDetailEvent) Time() uint64 {
+       return d.StartTime
+}
+
 func (d *SocketDetailEvent) GetConnectionID() uint64 {
        return d.ConnectionID
 }
diff --git a/pkg/profiling/task/network/analyze/events/data.go 
b/pkg/profiling/task/network/analyze/events/data.go
index 804e96b..1f71fa8 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -108,6 +108,10 @@ type SocketDetailEvent struct {
        RTTTime          uint32
 }
 
+func (s *SocketDetailEvent) Time() uint64 {
+       return 0
+}
+
 func (s *SocketDetailEvent) DataID() uint64 {
        return s.DataID0
 }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 52b181d..1acc342 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -185,7 +185,7 @@ func (a *ProtocolAnalyzer) 
processConnectionEvents(connection *connectionInfo) {
                finishReading := false
                switch result {
                case enums.ParseResultSuccess:
-                       finishReading = connection.buffer.RemoveReadElements()
+                       finishReading = 
connection.buffer.RemoveReadElements(true)
                case enums.ParseResultSkipPackage:
                        finishReading = connection.buffer.SkipCurrentElement()
                }
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 245e6dd..6cec0ef 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -51,7 +51,8 @@ var DurationHistogramBuckets = []float64{
 
 type Analyzer struct {
        // cache connection metrics if the connect event not receive or process
-       cache map[string]*ConnectionMetrics
+       cache  map[string]*ConnectionMetrics
+       reader *reader.Reader
 
        sampleConfig *SamplingConfig
 }
@@ -67,7 +68,8 @@ type ConnectionMetrics struct {
 
 func NewHTTP1Analyzer() protocol.Protocol {
        return &Analyzer{
-               cache: make(map[string]*ConnectionMetrics),
+               cache:  make(map[string]*ConnectionMetrics),
+               reader: reader.NewReader(),
        }
 }
 
@@ -90,7 +92,7 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
 
 func (h *Analyzer) ParseProtocol(connectionID uint64, metrics 
protocol.Metrics, buf *buffer.Buffer) enums.ParseResult {
        connectionMetrics := metrics.(*ConnectionMetrics)
-       messageType, err := reader.IdentityMessageType(buf)
+       messageType, err := h.reader.IdentityMessageType(buf)
        if err != nil {
                return enums.ParseResultSkipPackage
        }
@@ -116,7 +118,7 @@ func (h *Analyzer) ParseProtocol(connectionID uint64, 
metrics protocol.Metrics,
 
 func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf 
*buffer.Buffer) (enums.ParseResult, error) {
        // parsing request
-       req, r, err := reader.ReadRequest(buf, true)
+       req, r, err := h.reader.ReadRequest(buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
        }
@@ -137,7 +139,7 @@ func (h *Analyzer) handleResponse(connectionID uint64, 
metrics *ConnectionMetric
        request := metrics.halfData.Remove(firstElement).(*reader.Request)
 
        // parsing request
-       response, r, err := reader.ReadResponse(request, buf, true)
+       response, r, err := h.reader.ReadResponse(request, buf, true)
        if err != nil {
                return enums.ParseResultSkipPackage, err
        } else if r != enums.ParseResultSuccess {
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 4c73cf0..d041997 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -286,8 +286,8 @@ func (h *Trace) appendHTTPEvent(attaches 
[]*v3.SpanAttachedEvent, process api.Pr
 
 func (h *Trace) appendSyscallEvents(attachEvents []*v3.SpanAttachedEvent, 
process api.ProcessInterface, traffic *base.ProcessTraffic,
        message *reader.MessageOpt) []*v3.SpanAttachedEvent {
-       headerDetails := message.HeaderBuffer().Details()
-       bodyDetails := message.BodyBuffer().Details()
+       headerDetails := message.HeaderBuffer().BuildDetails()
+       bodyDetails := message.BodyBuffer().BuildDetails()
        dataIDCache := make(map[uint64]bool)
        for e := headerDetails.Front(); e != nil; e = e.Next() {
                event := e.Value.(*events.SocketDetailEvent)
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index e4aac3f..8cc8c85 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -35,9 +35,6 @@ import (
 )
 
 var (
-       headBuffer = make([]byte, 16)
-       bodyBuffer = make([]byte, 4096)
-
        requestMethods = []string{
                "GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT", 
"TRACE", "PATCH",
        }
@@ -53,15 +50,27 @@ const (
        MessageTypeUnknown
 )
 
-func IdentityMessageType(reader *buffer.Buffer) (MessageType, error) {
-       n, err := reader.Peek(headBuffer)
+type Reader struct {
+       headBuffer []byte
+       bodyBuffer []byte
+}
+
+func NewReader() *Reader {
+       return &Reader{
+               headBuffer: make([]byte, 16),
+               bodyBuffer: make([]byte, 4096),
+       }
+}
+
+func (r *Reader) IdentityMessageType(reader *buffer.Buffer) (MessageType, 
error) {
+       n, err := reader.Peek(r.headBuffer)
        if err != nil {
                return MessageTypeUnknown, err
-       } else if n != len(headBuffer) {
+       } else if n != len(r.headBuffer) {
                return MessageTypeUnknown, fmt.Errorf("need more content for 
header")
        }
 
-       headerString := string(headBuffer)
+       headerString := string(r.headBuffer)
        isRequest := false
        for _, method := range requestMethods {
                if strings.HasPrefix(headerString, method) {
@@ -83,6 +92,7 @@ type Message interface {
        Headers() http.Header
        HeaderBuffer() *buffer.Buffer
        BodyBuffer() *buffer.Buffer
+       Reader() *Reader
 }
 
 type MessageOpt struct {
@@ -192,7 +202,7 @@ func (m *MessageOpt) isChunked() bool {
 func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *buffer.Buffer, 
reader *bufio.Reader) (*buffer.Buffer, enums.ParseResult, error) {
        startPosition := buf.OffsetPosition(-reader.Buffered())
        for !buf.IsCurrentPacketReadFinished() {
-               _, err := buf.Read(bodyBuffer)
+               _, err := buf.Read(m.Reader().bodyBuffer)
                if err != nil {
                        return nil, enums.ParseResultSkipPackage, err
                }
@@ -237,7 +247,7 @@ func (m *MessageOpt) checkChunkedBody(buf *buffer.Buffer, 
bodyReader *bufio.Read
                        return nil, enums.ParseResultSkipPackage, 
fmt.Errorf("the chunk data parding error, should be empty: %s", d)
                }
        }
-       return buffer.CombineSlices(true, buffers...), 
enums.ParseResultSuccess, nil
+       return buffer.CombineSlices(true, buf, buffers...), 
enums.ParseResultSuccess, nil
 }
 
 func (m *MessageOpt) checkBodyWithSize(buf *buffer.Buffer, reader 
*bufio.Reader, size int,
@@ -248,10 +258,10 @@ func (m *MessageOpt) checkBodyWithSize(buf 
*buffer.Buffer, reader *bufio.Reader,
        startPosition := buf.OffsetPosition(-reader.Buffered())
        for reduceSize > 0 {
                readSize = reduceSize
-               if readSize > len(bodyBuffer) {
-                       readSize = len(bodyBuffer)
+               if readSize > len(m.Reader().bodyBuffer) {
+                       readSize = len(m.Reader().bodyBuffer)
                }
-               lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+               lastReadSize, err = 
reader.Read(m.Reader().bodyBuffer[0:readSize])
                if err != nil {
                        if err == buffer.ErrNotComplete {
                                return nil, enums.ParseResultSkipPackage, nil
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 8791611..7224727 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -34,6 +34,7 @@ type Request struct {
        original     *http.Request
        headerBuffer *buffer.Buffer
        bodyBuffer   *buffer.Buffer
+       reader       *Reader
 }
 
 func (r *Request) Headers() http.Header {
@@ -48,6 +49,10 @@ func (r *Request) BodyBuffer() *buffer.Buffer {
        return r.bodyBuffer
 }
 
+func (r *Request) Reader() *Reader {
+       return r.reader
+}
+
 func (r *Request) MinDataID() int {
        return int(r.headerBuffer.FirstSocketBuffer().DataID())
 }
@@ -57,11 +62,11 @@ func (r *Request) Original() *http.Request {
 }
 
 // nolint
-func ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, 
enums.ParseResult, error) {
+func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request, 
enums.ParseResult, error) {
        bufReader := bufio.NewReader(buf)
        tp := textproto.NewReader(bufReader)
        req := &http.Request{}
-       result := &Request{original: req}
+       result := &Request{original: req, reader: r}
        result.MessageOpt = &MessageOpt{result}
 
        headerStartPosition := buf.Position()
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index 966edb8..fae907b 100644
--- 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++ 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -35,6 +35,7 @@ type Response struct {
        original     *http.Response
        headerBuffer *buffer.Buffer
        bodyBuffer   *buffer.Buffer
+       reader       *Reader
 }
 
 func (r *Response) Headers() http.Header {
@@ -49,15 +50,19 @@ func (r *Response) BodyBuffer() *buffer.Buffer {
        return r.bodyBuffer
 }
 
+func (r *Response) Reader() *Reader {
+       return r.reader
+}
+
 func (r *Response) Original() *http.Response {
        return r.original
 }
 
-func ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response, 
enums.ParseResult, error) {
+func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) 
(*Response, enums.ParseResult, error) {
        bufReader := bufio.NewReader(buf)
        tp := textproto.NewReader(bufReader)
        resp := &http.Response{}
-       result := &Response{original: resp, req: req}
+       result := &Response{original: resp, req: req, reader: r}
        result.MessageOpt = &MessageOpt{result}
 
        headerStartPosition := buf.Position()
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 0bfc5d9..c5ce2ca 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -24,12 +24,18 @@ import (
        "sync"
        "time"
 
+       "github.com/sirupsen/logrus"
+
+       "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
        "github.com/apache/skywalking-rover/pkg/tools/host"
 )
 
 var (
        ErrNotComplete = errors.New("socket: not complete event")
+       emptyList      = list.New()
+
+       log = logger.GetLogger("tools", "buffer")
 )
 
 type SocketDataBuffer interface {
@@ -67,6 +73,14 @@ type SocketDataBuffer interface {
 type SocketDataDetail interface {
        // DataID data id of the buffer
        DataID() uint64
+       // Time (BPF) of the detail event
+       Time() uint64
+}
+
+type DataIDRange struct {
+       From                   uint64
+       To                     uint64
+       IsToBufferReadFinished bool
 }
 
 type Buffer struct {
@@ -82,6 +96,13 @@ type Buffer struct {
        // record the latest expired data id in connection for expire the older 
socket detail
        // because the older socket detail may not be received in buffer
        latestExpiredDataID uint64
+
+       // originalBuffer record this buffer is from Buffer.Slice or 
CombineSlices
+       // if it's not empty, then when getting the details should query from 
originalBuffer
+       // Because the BPF Queue is unsorted and can be delayed, so the details 
should query by real buffer
+       originalBuffer *Buffer
+       // endPosition record the end position of the originalBuffer
+       endPosition *Position
 }
 
 type SocketDataEventLimited struct {
@@ -106,6 +127,48 @@ func (s *SocketDataEventLimited) BufferStartPosition() int 
{
        return s.From
 }
 
+func (i *DataIDRange) IsIncludeAllDetails(list *list.List) bool {
+       if list.Len() == 0 {
+               return false
+       }
+       for e := list.Front(); e != nil; e = e.Next() {
+               if e.Value.(SocketDataDetail).DataID() < i.From || 
e.Value.(SocketDataDetail).DataID() > i.To {
+                       return false
+               }
+       }
+       return true
+}
+
+func (i *DataIDRange) Append(other *DataIDRange) *DataIDRange {
+       if other.From < i.From {
+               i.From = other.From
+       }
+       if other.To > i.To {
+               i.To = other.To
+       }
+       i.IsToBufferReadFinished = other.IsToBufferReadFinished
+       return i
+}
+
+func (i *DataIDRange) DeleteDetails(buf *Buffer) {
+       if buf.originalBuffer != nil {
+               i.DeleteDetails(buf.originalBuffer)
+       }
+       for e := buf.detailEvents.Front(); e != nil; {
+               next := e.Next()
+               dataId := e.Value.(SocketDataDetail).DataID()
+               if dataId >= i.From && dataId <= i.To {
+                       if !i.IsToBufferReadFinished && dataId == i.To {
+                               break
+                       }
+                       buf.detailEvents.Remove(e)
+                       log.Debugf("delete detail event from buffer, data id: 
%d, ref: %p, range: %d-%d(%t)",
+                               dataId, buf, i.From, i.To, 
i.IsToBufferReadFinished)
+               }
+               e = next
+       }
+}
+
 type Position struct {
        // element of the event list
        element *list.Element
@@ -143,6 +206,26 @@ func (r *Buffer) FindFirstDataBuffer(dataID uint64) 
SocketDataBuffer {
        return nil
 }
 
+func (r *Buffer) BuildTotalDataIDRange() *DataIDRange {
+       if r.dataEvents.Len() == 0 {
+               return nil
+       }
+       var toIndex uint64
+       var isToBufferReadFinished bool
+       if r.endPosition != nil {
+               toIndex = r.endPosition.DataID()
+               isToBufferReadFinished = r.endPosition.bufIndex == 
r.endPosition.element.Value.(SocketDataBuffer).BufferLen()
+       } else {
+               toIndex = r.current.DataID()
+               isToBufferReadFinished = r.current.bufIndex == 
r.current.element.Value.(SocketDataBuffer).BufferLen()
+       }
+       return &DataIDRange{
+               From:                   r.head.DataID(),
+               To:                     toIndex,
+               IsToBufferReadFinished: isToBufferReadFinished,
+       }
+}
+
 func (r *Buffer) Position() *Position {
        return r.current.Clone()
 }
@@ -155,6 +238,7 @@ func (r *Buffer) Clean() {
        r.detailEvents = list.New()
        r.head = nil
        r.current = nil
+       r.endPosition = nil
 }
 
 func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
@@ -198,11 +282,13 @@ func (r *Buffer) Slice(validated bool, start, end 
*Position) *Buffer {
        }
 
        return &Buffer{
-               dataEvents:   dataEvents,
-               detailEvents: detailEvents,
-               validated:    validated,
-               head:         &Position{element: dataEvents.Front(), bufIndex: 
start.bufIndex},
-               current:      &Position{element: dataEvents.Front(), bufIndex: 
start.bufIndex},
+               dataEvents:     dataEvents,
+               detailEvents:   detailEvents,
+               validated:      validated,
+               head:           &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
+               current:        &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
+               originalBuffer: r,
+               endPosition:    end,
        }
 }
 
@@ -219,7 +305,36 @@ func (r *Buffer) Len() int {
        return result
 }
 
-func (r *Buffer) Details() *list.List {
+func (r *Buffer) BuildDetails() *list.List {
+       // if the original buffer is not empty, then query the details from 
original buffer
+       if r.originalBuffer != nil {
+               events := list.New()
+               fromDataId := r.head.DataID()
+               var endDataId uint64
+               if r.endPosition != nil {
+                       endDataId = r.endPosition.DataID()
+               } else {
+                       endDataId = r.current.DataID()
+               }
+
+               for e := r.originalBuffer.detailEvents.Front(); e != nil; e = 
e.Next() {
+                       if e.Value.(SocketDataDetail).DataID() >= fromDataId && 
e.Value.(SocketDataDetail).DataID() <= endDataId {
+                               events.PushBack(e.Value)
+                       }
+               }
+               if events.Len() == 0 && log.Enable(logrus.DebugLevel) {
+                       dataIdList := make([]uint64, 0)
+                       for e := r.originalBuffer.detailEvents.Front(); e != 
nil; e = e.Next() {
+                               if e.Value != nil {
+                                       dataIdList = append(dataIdList, 
e.Value.(SocketDataDetail).DataID())
+                               }
+                       }
+                       log.Debugf("cannot found details from original buffer, 
from data id: %d, end data id: %d, "+
+                               "ref: %p, existing details data id list: %v", 
fromDataId, endDataId, r.originalBuffer, dataIdList)
+               }
+
+               return events
+       }
        return r.detailEvents
 }
 
@@ -281,7 +396,7 @@ func (r *Buffer) DetectNotSendingLastPosition() *Position {
        return nil
 }
 
-func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
+func CombineSlices(validated bool, originalBuffer *Buffer, buffers ...*Buffer) 
*Buffer {
        if len(buffers) == 0 {
                return nil
        }
@@ -289,7 +404,6 @@ func CombineSlices(validated bool, buffers ...*Buffer) 
*Buffer {
                return buffers[0]
        }
        dataEvents := list.New()
-       detailEvents := list.New()
        for _, b := range buffers {
                if b == nil || b.head == nil {
                        continue
@@ -304,15 +418,20 @@ func CombineSlices(validated bool, buffers ...*Buffer) 
*Buffer {
                } else {
                        dataEvents.PushBackList(b.dataEvents)
                }
-               detailEvents.PushBackList(b.detailEvents)
        }
 
+       var endPosition = buffers[len(buffers)-1].endPosition
+       if endPosition == nil {
+               endPosition = buffers[len(buffers)-1].Position()
+       }
        return &Buffer{
-               dataEvents:   dataEvents,
-               detailEvents: detailEvents,
-               validated:    validated,
-               head:         &Position{element: dataEvents.Front(), bufIndex: 
0},
-               current:      &Position{element: dataEvents.Front(), bufIndex: 
0},
+               dataEvents:     dataEvents,
+               detailEvents:   emptyList, // for the combined buffer, the 
details list should be queried from original buffer
+               validated:      validated,
+               head:           &Position{element: dataEvents.Front(), 
bufIndex: 0},
+               current:        &Position{element: dataEvents.Front(), 
bufIndex: 0},
+               originalBuffer: originalBuffer,
+               endPosition:    endPosition,
        }
 }
 
@@ -505,12 +624,12 @@ func (r *Buffer) PrepareForReading() bool {
 }
 
 // nolint
-func (r *Buffer) RemoveReadElements() bool {
+func (r *Buffer) RemoveReadElements(includeDetails bool) bool {
        r.eventLocker.Lock()
        defer r.eventLocker.Unlock()
 
        // delete until the last data id
-       if r.head.element != nil && r.current.element != nil {
+       if includeDetails && r.head.element != nil && r.current.element != nil {
                firstDataID := r.head.element.Value.(SocketDataBuffer).DataID()
                currentBuffer := r.current.element.Value.(SocketDataBuffer)
                lastDataID := currentBuffer.DataID()
@@ -531,6 +650,7 @@ func (r *Buffer) RemoveReadElements() bool {
                        if startDelete {
                                tmp := e.Next()
                                r.detailEvents.Remove(e)
+                               log.Debugf("delete detail event from readed 
buffer, data id: %d, ref: %p", event.DataID(), r)
                                e = tmp
                        } else {
                                e = e.Next()
@@ -588,6 +708,10 @@ func (r *Buffer) AppendDetailEvent(event SocketDataDetail) 
{
                r.detailEvents.PushFront(event)
                return
        }
+       if r.detailEvents.Back().Value == nil {
+               r.detailEvents.PushFront(event)
+               return
+       }
        if r.detailEvents.Back().Value.(SocketDataDetail).DataID() < 
event.DataID() {
                r.detailEvents.PushBack(event)
                return
@@ -660,7 +784,13 @@ func (r *Buffer) DeleteExpireEvents(expireDuration 
time.Duration) int {
 
        // detail event queue
        count += r.deleteEventsWithJudgement(r.detailEvents, func(element 
*list.Element) bool {
-               return r.latestExpiredDataID > 0 && 
element.Value.(SocketDataDetail).DataID() <= r.latestExpiredDataID
+               detail := element.Value.(SocketDataDetail)
+               isDelete := r.latestExpiredDataID > 0 && detail.DataID() <= 
r.latestExpiredDataID ||
+                       (detail.Time() > 0 && 
expireTime.After(host.Time(detail.Time())))
+               if isDelete {
+                       log.Debugf("delete expired detail event, data id: %d, 
buf: %p", detail.DataID(), r)
+               }
+               return isDelete
        })
        return count
 }
@@ -672,13 +802,6 @@ func (r *Buffer) DataLength() int {
        return r.dataEvents.Len()
 }
 
-func (r *Buffer) DetailLength() int {
-       if r.detailEvents == nil {
-               return 0
-       }
-       return r.detailEvents.Len()
-}
-
 func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element 
*list.Element) bool) int {
        count := 0
        for e := l.Front(); e != nil; {


Reply via email to