This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 089d9c9 Reduce missing details issue in the access log module (#168)
089d9c9 is described below
commit 089d9c9670c54fcb026e38c91396760d4b193a40
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 16 11:03:49 2024 +0800
Reduce missing details issue in the access log module (#168)
---
CHANGES.md | 1 +
configs/rover_configs.yaml | 4 +-
pkg/accesslog/collector/connection.go | 25 +--
pkg/accesslog/collector/protocols/http1.go | 118 ++++++++++----
pkg/accesslog/collector/protocols/http2.go | 57 ++++---
pkg/accesslog/collector/protocols/protocol.go | 22 ++-
pkg/accesslog/collector/protocols/queue.go | 4 +-
pkg/accesslog/common/config.go | 3 +-
pkg/accesslog/events/detail.go | 4 +
pkg/profiling/task/network/analyze/events/data.go | 4 +
.../analyze/layer7/protocols/base/analyzer.go | 2 +-
.../analyze/layer7/protocols/http1/analyzer.go | 12 +-
.../analyze/layer7/protocols/http1/metrics.go | 4 +-
.../layer7/protocols/http1/reader/reader.go | 34 ++--
.../layer7/protocols/http1/reader/request.go | 9 +-
.../layer7/protocols/http1/reader/response.go | 9 +-
pkg/tools/buffer/buffer.go | 181 ++++++++++++++++++---
17 files changed, 370 insertions(+), 123 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6907c20..ec2109d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
* Fix missing the first socket detail event in HTTPS protocol.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
+* Reduce missing details issue in the access log module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml
index 8f7ba6e..c497c16 100644
--- a/configs/rover_configs.yaml
+++ b/configs/rover_configs.yaml
@@ -143,8 +143,10 @@ access_log:
connection_analyze:
# The size of connection buffer on each CPU
per_cpu_buffer: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PER_CPU_BUFFER:200KB}
+ # The count of parallel connection event parse
+ parse_parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARSE_PARALLELS:1}
# The count of parallel connection analyzer
- parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
+ analyze_parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
# The size of per paralleled analyzer queue
queue_size: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_QUEUE_SIZE:2000}
protocol_analyze:
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index 8e8aba1..a5565a1 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -64,7 +64,10 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
if int(perCPUBufferSize) < os.Getpagesize() {
return fmt.Errorf("the cpu buffer must bigger than %dB",
os.Getpagesize())
}
- if ctx.Config.ConnectionAnalyze.Parallels < 1 {
+ if ctx.Config.ConnectionAnalyze.ParseParallels < 1 {
+ return fmt.Errorf("the parallels cannot be small than 1")
+ }
+ if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 {
return fmt.Errorf("the parallels cannot be small than 1")
}
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
@@ -74,15 +77,17 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker,
%v", err)
}
- c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
- return newConnectionPartitionContext(ctx, track)
- })
- c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize), 1, func() interface{} {
- return &events.SocketConnectEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
- })
- c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), 1, func() interface{} {
+ c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
+ ctx.Config.ConnectionAnalyze.QueueSize, func(num int)
btf.PartitionContext {
+ return newConnectionPartitionContext(ctx, track)
+ })
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
+ ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
+ return &events.SocketConnectEvent{}
+ }, func(data interface{}) string {
+ return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
+ })
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func()
interface{} {
return &events.SocketCloseEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d",
data.(*events.SocketCloseEvent).ConnectionID)
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 92a8173..9c17160 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -19,6 +19,7 @@ package protocols
import (
"container/list"
+ "fmt"
"io"
"github.com/apache/skywalking-rover/pkg/accesslog/common"
@@ -33,16 +34,18 @@ import (
)
var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
+var http1AnalyzeMaxRetryCount = 3
-type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request,
response *reader.Response)
+type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request,
response *reader.Response) error
type HTTP1Protocol struct {
ctx *common.AccessLogContext
analyze HTTP1ProtocolAnalyze
+ reader *reader.Reader
}
func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze
HTTP1ProtocolAnalyze) *HTTP1Protocol {
- protocol := &HTTP1Protocol{ctx: ctx}
+ protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
if analyze == nil {
protocol.analyze = protocol.HandleHTTPData
} else {
@@ -55,29 +58,41 @@ type HTTP1Metrics struct {
ConnectionID uint64
RandomID uint64
- halfRequests *list.List
+ halfRequests *list.List
+ analyzeUnFinished *list.List
}
func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64)
ProtocolMetrics {
return &HTTP1Metrics{
- ConnectionID: connectionID,
- RandomID: randomID,
- halfRequests: list.New(),
+ ConnectionID: connectionID,
+ RandomID: randomID,
+ halfRequests: list.New(),
+ analyzeUnFinished: list.New(),
}
}
+type HTTP1AnalyzeUnFinished struct {
+ request *reader.Request
+ response *reader.Response
+ retryCount int
+}
+
func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _
*AnalyzeHelper) error {
metrics :=
connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID:
%d, random ID: %d, data len: %d",
metrics.ConnectionID, metrics.RandomID, buf.DataLength())
+ p.handleUnFinishedEvents(metrics)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
return nil
}
- messageType, err := reader.IdentityMessageType(buf)
+ messageType, err := p.reader.IdentityMessageType(buf)
+ log.Debugf("ready to reading message type, messageType: %v,
buf: %p, data id: %d, "+
+ "connection ID: %d, random ID: %d, error: %v",
messageType, buf, buf.Position().DataID(),
+ metrics.ConnectionID, metrics.RandomID, err)
if err != nil {
http1Log.Debugf("failed to identity message type, %v",
err)
if buf.SkipCurrentElement() {
@@ -89,19 +104,25 @@ func (p *HTTP1Protocol) Analyze(connection
*PartitionConnection, _ *AnalyzeHelpe
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
- result, _ = p.handleRequest(metrics, buf)
+ result, err = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
- result, _ = p.handleResponse(metrics, buf)
+ result, err = p.handleResponse(metrics, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}
+ if err != nil {
+ http1Log.Warnf("failed to handle HTTP/1.x protocol,
connection ID: %d, random ID: %d, data id: %d, error: %v",
+ metrics.ConnectionID, metrics.RandomID,
buf.Position().DataID(), err)
+ }
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = buf.RemoveReadElements()
+ finishReading = buf.RemoveReadElements(false)
case enums.ParseResultSkipPackage:
finishReading = buf.SkipCurrentElement()
+ log.Debugf("skip current element, data id: %d, buf: %p,
connection ID: %d, random ID: %d",
+ buf.Position().DataID(), buf,
metrics.ConnectionID, metrics.RandomID)
}
if finishReading {
@@ -116,7 +137,7 @@ func (p *HTTP1Protocol) ForProtocol()
enums.ConnectionProtocol {
}
func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf
*buffer.Buffer) (enums.ParseResult, error) {
- req, result, err := reader.ReadRequest(buf, true)
+ req, result, err := p.reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
}
@@ -130,12 +151,14 @@ func (p *HTTP1Protocol) handleRequest(metrics
*HTTP1Metrics, buf *buffer.Buffer)
func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b
*buffer.Buffer) (enums.ParseResult, error) {
firstRequest := metrics.halfRequests.Front()
if firstRequest == nil {
+ log.Debugf("cannot found request for response, skip response,
connection ID: %d, random ID: %d",
+ metrics.ConnectionID, metrics.RandomID)
return enums.ParseResultSkipPackage, nil
}
request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)
// parsing response
- response, result, err := reader.ReadResponse(request, b, true)
+ response, result, err := p.reader.ReadResponse(request, b, true)
defer func() {
// if parsing response failed, then put the request back to the
list
if result != enums.ParseResultSuccess {
@@ -149,37 +172,71 @@ func (p *HTTP1Protocol) handleResponse(metrics
*HTTP1Metrics, b *buffer.Buffer)
}
// getting the request and response, then send to the forwarder
- p.analyze(metrics, request, response)
+ if analyzeError := p.analyze(metrics, request, response); analyzeError
!= nil {
+ p.appendAnalyzeUnFinished(metrics, request, response)
+ }
return enums.ParseResultSuccess, nil
}
-func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request
*reader.Request, response *reader.Response) {
- detailEvents := make([]events.SocketDetail, 0)
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
request.HeaderBuffer())
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
request.BodyBuffer())
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
response.HeaderBuffer())
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
response.BodyBuffer())
-
- if len(detailEvents) == 0 {
- http1Log.Warnf("cannot found any detail events for HTTP/1.x
protocol, connection ID: %d, random ID: %d, data id: %d-%d",
- metrics.ConnectionID, metrics.RandomID,
- request.MinDataID(),
response.BodyBuffer().LastSocketBuffer().DataID())
- return
+func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request
*reader.Request, response *reader.Response) {
+ metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{
+ request: request,
+ response: response,
+ retryCount: 0,
+ })
+}
+
+func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
+ for element := m.analyzeUnFinished.Front(); element != nil; {
+ unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
+ err := p.analyze(m, unFinished.request, unFinished.response)
+ if err != nil {
+ unFinished.retryCount++
+ if unFinished.retryCount < http1AnalyzeMaxRetryCount {
+ element = element.Next()
+ continue
+ }
+ http1Log.Warnf("failed to analyze HTTP1 request and
response, connection ID: %d, random ID: %d, "+
+ "retry count: %d, error: %v", m.ConnectionID,
m.RandomID, unFinished.retryCount, err)
+ }
+ next := element.Next()
+ m.analyzeUnFinished.Remove(element)
+ element = next
}
- http1Log.Debugf("found fully HTTP1 request and response, contains %d
detail events , connection ID: %d, random ID: %d",
- len(detailEvents), metrics.ConnectionID, metrics.RandomID)
+}
+
+func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request
*reader.Request, response *reader.Response) error {
+ details := make([]events.SocketDetail, 0)
+ var allInclude = true
+ var idRange *buffer.DataIDRange
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
request.HeaderBuffer(), idRange, allInclude)
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
request.BodyBuffer(), idRange, allInclude)
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
response.HeaderBuffer(), idRange, allInclude)
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
response.BodyBuffer(), idRange, allInclude)
+
+ if !allInclude {
+ return fmt.Errorf("cannot found full detail events for HTTP/1.x
protocol, "+
+ "data id: %d-%d, current details count: %d",
+ request.MinDataID(),
response.BodyBuffer().LastSocketBuffer().DataID(), len(details))
+ }
+
+ http1Log.Debugf("found fully HTTP1 request and response, contains %d
detail events, "+
+ "connection ID: %d, random ID: %d, data range: %d-%d(%t)",
+ len(details), metrics.ConnectionID, metrics.RandomID,
idRange.From, idRange.To, idRange.IsToBufferReadFinished)
originalRequest := request.Original()
originalResponse := response.Original()
+ // delete details(each request or response is fine because it's will
delete the original buffer)
+ idRange.DeleteDetails(request.HeaderBuffer())
defer func() {
p.CloseStream(originalRequest.Body)
p.CloseStream(originalResponse.Body)
}()
- forwarder.SendTransferProtocolEvent(p.ctx, detailEvents,
&v3.AccessLogProtocolLogs{
+ forwarder.SendTransferProtocolEvent(p.ctx, details,
&v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
- StartTime:
forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
- EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
+ StartTime:
forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
+ EndTime:
forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
Version:
v3.AccessLogHTTPProtocolVersion_HTTP1,
Request: &v3.AccessLogHTTPProtocolRequest{
Method:
TransformHTTPMethod(originalRequest.Method),
@@ -198,6 +255,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics
*HTTP1Metrics, request *reader.Re
},
},
})
+ return nil
}
func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index b18226a..f048c14 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -19,6 +19,7 @@ package protocols
import (
"errors"
+ "fmt"
"strconv"
"strings"
"time"
@@ -42,7 +43,7 @@ var maxHTTP2StreamingTime = time.Minute * 3
var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
-type HTTP2StreamAnalyze func(stream *HTTP2Streaming)
+type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
type HTTP2Protocol struct {
ctx *common.AccessLogContext
@@ -140,7 +141,7 @@ func (r *HTTP2Protocol) Analyze(connection
*PartitionConnection, helper *Analyze
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = buf.RemoveReadElements()
+ finishReading = buf.RemoveReadElements(false)
case enums.ParseResultSkipPackage:
finishReading = buf.SkipCurrentElement()
}
@@ -194,17 +195,17 @@ func (r *HTTP2Protocol) handleHeader(header
*http2.FrameHeader, startPos *buffer
if !streaming.IsInResponse {
r.AppendHeaders(streaming.ReqHeader, headers)
- streaming.ReqHeaderBuffer = buffer.CombineSlices(true,
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+ streaming.ReqHeaderBuffer = buffer.CombineSlices(true, buf,
streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
return enums.ParseResultSuccess, false, nil
}
r.AppendHeaders(streaming.RespHeader, headers)
- streaming.RespHeaderBuffer = buffer.CombineSlices(true,
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
+ streaming.RespHeaderBuffer = buffer.CombineSlices(true, buf,
streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
// is end of stream and in the response
if header.Flags.Has(http2.FlagHeadersEndStream) {
// should be end of the stream and send to the protocol
- r.analyze(streaming)
+ _ = r.analyze(streaming)
// delete streaming
delete(metrics.streams, header.StreamID)
}
@@ -226,7 +227,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics
*HTTP2Metrics, id ui
http2Log.Infof("detect the HTTP/2 stream is too long, split the
stream, connection ID: %d, stream ID: %d, headers: %v",
metrics.connectionID, id, streaming.ReqHeader)
- r.analyze(streaming)
+ _ = r.analyze(streaming)
// clean sent buffers
if streaming.ReqBodyBuffer != nil {
@@ -235,24 +236,27 @@ func (r *HTTP2Protocol)
validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
}
}
-func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
- detailEvents := make([]events.SocketDetail, 0)
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.ReqHeaderBuffer)
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.ReqBodyBuffer)
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.RespHeaderBuffer)
- detailEvents = AppendSocketDetailsFromBuffer(detailEvents,
stream.RespBodyBuffer)
-
- if len(detailEvents) == 0 {
- http2Log.Warnf("cannot found any detail events for HTTP/2
protocol, data id: %d-%d",
- stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(),
stream.RespBodyBuffer.LastSocketBuffer().DataID())
- return
+func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
+ details := make([]events.SocketDetail, 0)
+ var allInclude = true
+ var idRange *buffer.DataIDRange
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
stream.ReqHeaderBuffer, idRange, allInclude)
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
stream.ReqBodyBuffer, idRange, allInclude)
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
stream.RespHeaderBuffer, idRange, allInclude)
+ details, idRange, allInclude = AppendSocketDetailsFromBuffer(details,
stream.RespBodyBuffer, idRange, allInclude)
+
+ if !allInclude {
+ return fmt.Errorf("cannot found any detail events for HTTP/2
protocol, data id: %d-%d, current details count: %d",
+ stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(),
stream.RespBodyBuffer.LastSocketBuffer().DataID(),
+ len(details))
}
+ idRange.DeleteDetails(stream.ReqHeaderBuffer)
- forwarder.SendTransferProtocolEvent(r.ctx, detailEvents,
&v3.AccessLogProtocolLogs{
+ forwarder.SendTransferProtocolEvent(r.ctx, details,
&v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
- StartTime:
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer,
detailEvents[0]).GetStartTime()),
- EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
+ StartTime:
forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer,
details[0]).GetStartTime()),
+ EndTime:
forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
Version:
v3.AccessLogHTTPProtocolVersion_HTTP2,
Request: &v3.AccessLogHTTPProtocolRequest{
Method:
r.ParseHTTPMethod(stream),
@@ -272,6 +276,7 @@ func (r *HTTP2Protocol) handleWholeStream(stream
*HTTP2Streaming) {
},
},
})
+ return nil
}
func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming)
v3.AccessLogHTTPProtocolRequestMethod {
@@ -284,10 +289,14 @@ func (r *HTTP2Protocol) ParseHTTPMethod(streaming
*HTTP2Streaming) v3.AccessLogH
}
func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def
events.SocketDetail) events.SocketDetail {
- if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
+ if buf == nil {
+ return def
+ }
+ details := buf.BuildDetails()
+ if details == nil || details.Len() == 0 {
return def
}
- return buf.Details().Front().Value.(events.SocketDetail)
+ return details.Front().Value.(events.SocketDetail)
}
func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 {
@@ -315,9 +324,9 @@ func (r *HTTP2Protocol) handleData(header
*http2.FrameHeader, startPos *buffer.P
return enums.ParseResultSkipPackage, false, err
}
if !streaming.IsInResponse {
- streaming.ReqBodyBuffer = buffer.CombineSlices(true,
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+ streaming.ReqBodyBuffer = buffer.CombineSlices(true, buf,
streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
} else {
- streaming.RespBodyBuffer = buffer.CombineSlices(true,
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
+ streaming.RespBodyBuffer = buffer.CombineSlices(true, buf,
streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
}
r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
diff --git a/pkg/accesslog/collector/protocols/protocol.go
b/pkg/accesslog/collector/protocols/protocol.go
index 41ac8a2..673796c 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -56,17 +56,29 @@ type Protocol interface {
Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error
}
-func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf
*buffer.Buffer) []events.SocketDetail {
- if buf == nil || buf.DetailLength() == 0 {
- return result
+func AppendSocketDetailsFromBuffer(result []events.SocketDetail, buf
*buffer.Buffer, dataIDRange *buffer.DataIDRange,
+ allDetailInclude bool) ([]events.SocketDetail, *buffer.DataIDRange,
bool) {
+ if buf == nil || !allDetailInclude {
+ return result, dataIDRange, false
}
- for e := buf.Details().Front(); e != nil; e = e.Next() {
+ details := buf.BuildDetails()
+ if details == nil || details.Len() == 0 {
+ return result, dataIDRange, false
+ }
+ currentDataIDRange := buf.BuildTotalDataIDRange()
+ if !currentDataIDRange.IsIncludeAllDetails(details) {
+ return result, dataIDRange, false
+ }
+ for e := details.Front(); e != nil; e = e.Next() {
if len(result) > 0 && result[len(result)-1] == e.Value {
continue
}
result = append(result, e.Value.(events.SocketDetail))
}
- return result
+ if dataIDRange == nil {
+ return result, currentDataIDRange, true
+ }
+ return result, dataIDRange.Append(currentDataIDRange), true
}
func AnalyzeTraceInfo(fetcher func(key string) string, protocolLog
*logger.Logger) *v3.AccessLogTraceInfo {
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 68708fc..833d323 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -215,9 +215,9 @@ func (p *PartitionContext) Consume(data interface{}) {
case events.SocketDetail:
pid, _ := events.ParseConnectionID(event.GetConnectionID())
log.Debugf("receive the socket detail event, connection ID: %d,
random ID: %d, pid: %d, data id: %d, "+
- "function name: %s, package count: %d, package size:
%d, ssl: %d",
+ "function name: %s, package count: %d, package size:
%d, ssl: %d, protocol: %d",
event.GetConnectionID(), event.GetRandomID(), pid,
event.DataID(), event.GetFunctionName(),
- event.GetL4PackageCount(),
event.GetL4TotalPackageSize(), event.GetSSL())
+ event.GetL4PackageCount(),
event.GetL4TotalPackageSize(), event.GetSSL(), event.GetProtocol())
if event.GetProtocol() == enums.ConnectionProtocolUnknown {
// if the connection protocol is unknown, we just needs
to add this into the kernel log
forwarder.SendTransferNoProtocolEvent(p.context, event)
diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go
index 1ef0aeb..0298088 100644
--- a/pkg/accesslog/common/config.go
+++ b/pkg/accesslog/common/config.go
@@ -37,7 +37,8 @@ type FlushConfig struct {
type ConnectionAnalyzeConfig struct {
PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
- Parallels int `mapstructure:"parallels"`
+ ParseParallels int `mapstructure:"parse_parallels"`
+ AnalyzeParallels int `mapstructure:"analyze_parallels"`
QueueSize int `mapstructure:"queue_size"`
}
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index 3917931..a5ac46c 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -103,6 +103,10 @@ func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) {
d.SSL = r.ReadUint8()
}
+func (d *SocketDetailEvent) Time() uint64 {
+ return d.StartTime
+}
+
func (d *SocketDetailEvent) GetConnectionID() uint64 {
return d.ConnectionID
}
diff --git a/pkg/profiling/task/network/analyze/events/data.go
b/pkg/profiling/task/network/analyze/events/data.go
index 804e96b..1f71fa8 100644
--- a/pkg/profiling/task/network/analyze/events/data.go
+++ b/pkg/profiling/task/network/analyze/events/data.go
@@ -108,6 +108,10 @@ type SocketDetailEvent struct {
RTTTime uint32
}
+func (s *SocketDetailEvent) Time() uint64 {
+ return 0
+}
+
func (s *SocketDetailEvent) DataID() uint64 {
return s.DataID0
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 52b181d..1acc342 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -185,7 +185,7 @@ func (a *ProtocolAnalyzer)
processConnectionEvents(connection *connectionInfo) {
finishReading := false
switch result {
case enums.ParseResultSuccess:
- finishReading = connection.buffer.RemoveReadElements()
+ finishReading =
connection.buffer.RemoveReadElements(true)
case enums.ParseResultSkipPackage:
finishReading = connection.buffer.SkipCurrentElement()
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 245e6dd..6cec0ef 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -51,7 +51,8 @@ var DurationHistogramBuckets = []float64{
type Analyzer struct {
// cache connection metrics if the connect event not receive or process
- cache map[string]*ConnectionMetrics
+ cache map[string]*ConnectionMetrics
+ reader *reader.Reader
sampleConfig *SamplingConfig
}
@@ -67,7 +68,8 @@ type ConnectionMetrics struct {
func NewHTTP1Analyzer() protocol.Protocol {
return &Analyzer{
- cache: make(map[string]*ConnectionMetrics),
+ cache: make(map[string]*ConnectionMetrics),
+ reader: reader.NewReader(),
}
}
@@ -90,7 +92,7 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
func (h *Analyzer) ParseProtocol(connectionID uint64, metrics
protocol.Metrics, buf *buffer.Buffer) enums.ParseResult {
connectionMetrics := metrics.(*ConnectionMetrics)
- messageType, err := reader.IdentityMessageType(buf)
+ messageType, err := h.reader.IdentityMessageType(buf)
if err != nil {
return enums.ParseResultSkipPackage
}
@@ -116,7 +118,7 @@ func (h *Analyzer) ParseProtocol(connectionID uint64,
metrics protocol.Metrics,
func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf
*buffer.Buffer) (enums.ParseResult, error) {
// parsing request
- req, r, err := reader.ReadRequest(buf, true)
+ req, r, err := h.reader.ReadRequest(buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
}
@@ -137,7 +139,7 @@ func (h *Analyzer) handleResponse(connectionID uint64,
metrics *ConnectionMetric
request := metrics.halfData.Remove(firstElement).(*reader.Request)
// parsing request
- response, r, err := reader.ReadResponse(request, buf, true)
+ response, r, err := h.reader.ReadResponse(request, buf, true)
if err != nil {
return enums.ParseResultSkipPackage, err
} else if r != enums.ParseResultSuccess {
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 4c73cf0..d041997 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -286,8 +286,8 @@ func (h *Trace) appendHTTPEvent(attaches
[]*v3.SpanAttachedEvent, process api.Pr
func (h *Trace) appendSyscallEvents(attachEvents []*v3.SpanAttachedEvent,
process api.ProcessInterface, traffic *base.ProcessTraffic,
message *reader.MessageOpt) []*v3.SpanAttachedEvent {
- headerDetails := message.HeaderBuffer().Details()
- bodyDetails := message.BodyBuffer().Details()
+ headerDetails := message.HeaderBuffer().BuildDetails()
+ bodyDetails := message.BodyBuffer().BuildDetails()
dataIDCache := make(map[uint64]bool)
for e := headerDetails.Front(); e != nil; e = e.Next() {
event := e.Value.(*events.SocketDetailEvent)
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index e4aac3f..8cc8c85 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -35,9 +35,6 @@ import (
)
var (
- headBuffer = make([]byte, 16)
- bodyBuffer = make([]byte, 4096)
-
requestMethods = []string{
"GET", "POST", "OPTIONS", "HEAD", "PUT", "DELETE", "CONNECT",
"TRACE", "PATCH",
}
@@ -53,15 +50,27 @@ const (
MessageTypeUnknown
)
-func IdentityMessageType(reader *buffer.Buffer) (MessageType, error) {
- n, err := reader.Peek(headBuffer)
+type Reader struct {
+ headBuffer []byte
+ bodyBuffer []byte
+}
+
+func NewReader() *Reader {
+ return &Reader{
+ headBuffer: make([]byte, 16),
+ bodyBuffer: make([]byte, 4096),
+ }
+}
+
+func (r *Reader) IdentityMessageType(reader *buffer.Buffer) (MessageType,
error) {
+ n, err := reader.Peek(r.headBuffer)
if err != nil {
return MessageTypeUnknown, err
- } else if n != len(headBuffer) {
+ } else if n != len(r.headBuffer) {
return MessageTypeUnknown, fmt.Errorf("need more content for
header")
}
- headerString := string(headBuffer)
+ headerString := string(r.headBuffer)
isRequest := false
for _, method := range requestMethods {
if strings.HasPrefix(headerString, method) {
@@ -83,6 +92,7 @@ type Message interface {
Headers() http.Header
HeaderBuffer() *buffer.Buffer
BodyBuffer() *buffer.Buffer
+ Reader() *Reader
}
type MessageOpt struct {
@@ -192,7 +202,7 @@ func (m *MessageOpt) isChunked() bool {
func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *buffer.Buffer,
reader *bufio.Reader) (*buffer.Buffer, enums.ParseResult, error) {
startPosition := buf.OffsetPosition(-reader.Buffered())
for !buf.IsCurrentPacketReadFinished() {
- _, err := buf.Read(bodyBuffer)
+ _, err := buf.Read(m.Reader().bodyBuffer)
if err != nil {
return nil, enums.ParseResultSkipPackage, err
}
@@ -237,7 +247,7 @@ func (m *MessageOpt) checkChunkedBody(buf *buffer.Buffer,
bodyReader *bufio.Read
return nil, enums.ParseResultSkipPackage,
fmt.Errorf("the chunk data parding error, should be empty: %s", d)
}
}
- return buffer.CombineSlices(true, buffers...),
enums.ParseResultSuccess, nil
+ return buffer.CombineSlices(true, buf, buffers...),
enums.ParseResultSuccess, nil
}
func (m *MessageOpt) checkBodyWithSize(buf *buffer.Buffer, reader
*bufio.Reader, size int,
@@ -248,10 +258,10 @@ func (m *MessageOpt) checkBodyWithSize(buf
*buffer.Buffer, reader *bufio.Reader,
startPosition := buf.OffsetPosition(-reader.Buffered())
for reduceSize > 0 {
readSize = reduceSize
- if readSize > len(bodyBuffer) {
- readSize = len(bodyBuffer)
+ if readSize > len(m.Reader().bodyBuffer) {
+ readSize = len(m.Reader().bodyBuffer)
}
- lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+ lastReadSize, err =
reader.Read(m.Reader().bodyBuffer[0:readSize])
if err != nil {
if err == buffer.ErrNotComplete {
return nil, enums.ParseResultSkipPackage, nil
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
index 8791611..7224727 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -34,6 +34,7 @@ type Request struct {
original *http.Request
headerBuffer *buffer.Buffer
bodyBuffer *buffer.Buffer
+ reader *Reader
}
func (r *Request) Headers() http.Header {
@@ -48,6 +49,10 @@ func (r *Request) BodyBuffer() *buffer.Buffer {
return r.bodyBuffer
}
+func (r *Request) Reader() *Reader {
+ return r.reader
+}
+
func (r *Request) MinDataID() int {
return int(r.headerBuffer.FirstSocketBuffer().DataID())
}
@@ -57,11 +62,11 @@ func (r *Request) Original() *http.Request {
}
// nolint
-func ReadRequest(buf *buffer.Buffer, readBody bool) (*Request,
enums.ParseResult, error) {
+func (r *Reader) ReadRequest(buf *buffer.Buffer, readBody bool) (*Request,
enums.ParseResult, error) {
bufReader := bufio.NewReader(buf)
tp := textproto.NewReader(bufReader)
req := &http.Request{}
- result := &Request{original: req}
+ result := &Request{original: req, reader: r}
result.MessageOpt = &MessageOpt{result}
headerStartPosition := buf.Position()
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
index 966edb8..fae907b 100644
---
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
+++
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -35,6 +35,7 @@ type Response struct {
original *http.Response
headerBuffer *buffer.Buffer
bodyBuffer *buffer.Buffer
+ reader *Reader
}
func (r *Response) Headers() http.Header {
@@ -49,15 +50,19 @@ func (r *Response) BodyBuffer() *buffer.Buffer {
return r.bodyBuffer
}
+func (r *Response) Reader() *Reader {
+ return r.reader
+}
+
func (r *Response) Original() *http.Response {
return r.original
}
-func ReadResponse(req *Request, buf *buffer.Buffer, readBody bool) (*Response,
enums.ParseResult, error) {
+func (r *Reader) ReadResponse(req *Request, buf *buffer.Buffer, readBody bool)
(*Response, enums.ParseResult, error) {
bufReader := bufio.NewReader(buf)
tp := textproto.NewReader(bufReader)
resp := &http.Response{}
- result := &Response{original: resp, req: req}
+ result := &Response{original: resp, req: req, reader: r}
result.MessageOpt = &MessageOpt{result}
headerStartPosition := buf.Position()
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 0bfc5d9..9993f47 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -24,12 +24,18 @@ import (
"sync"
"time"
+ "github.com/sirupsen/logrus"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
var (
ErrNotComplete = errors.New("socket: not complete event")
+ emptyList = list.New()
+
+ log = logger.GetLogger("tools", "buffer")
)
type SocketDataBuffer interface {
@@ -67,6 +73,14 @@ type SocketDataBuffer interface {
type SocketDataDetail interface {
// DataID data id of the buffer
DataID() uint64
+ // Time (BPF) of the detail event
+ Time() uint64
+}
+
+type DataIDRange struct {
+ From uint64
+ To uint64
+ IsToBufferReadFinished bool
}
type Buffer struct {
@@ -82,6 +96,13 @@ type Buffer struct {
// record the latest expired data id in connection for expire the older
socket detail
// because the older socket detail may not be received in buffer
latestExpiredDataID uint64
+
+ // originalBuffer record this buffer is from Buffer.Slice or
CombineSlices
+ // if it's not empty, then when getting the details should query from
originalBuffer
+ // Because the BPF Queue is unsorted and can be delayed, so the details
should query by real buffer
+ originalBuffer *Buffer
+ // endPosition record the end position of the originalBuffer
+ endPosition *Position
}
type SocketDataEventLimited struct {
@@ -106,6 +127,48 @@ func (s *SocketDataEventLimited) BufferStartPosition() int
{
return s.From
}
+func (i *DataIDRange) IsIncludeAllDetails(l *list.List) bool {
+ if l.Len() == 0 {
+ return false
+ }
+ for e := l.Front(); e != nil; e = e.Next() {
+ if e.Value.(SocketDataDetail).DataID() < i.From ||
e.Value.(SocketDataDetail).DataID() > i.To {
+ return false
+ }
+ }
+ return true
+}
+
+func (i *DataIDRange) Append(other *DataIDRange) *DataIDRange {
+ if other.From < i.From {
+ i.From = other.From
+ }
+ if other.To > i.To {
+ i.To = other.To
+ }
+ i.IsToBufferReadFinished = other.IsToBufferReadFinished
+ return i
+}
+
+func (i *DataIDRange) DeleteDetails(buf *Buffer) {
+ if buf.originalBuffer != nil {
+ i.DeleteDetails(buf.originalBuffer)
+ }
+ for e := buf.detailEvents.Front(); e != nil; {
+ next := e.Next()
+ dataID := e.Value.(SocketDataDetail).DataID()
+ if dataID >= i.From && dataID <= i.To {
+ if !i.IsToBufferReadFinished && dataID == i.To {
+ break
+ }
+ buf.detailEvents.Remove(e)
+ log.Debugf("delete detail event from buffer, data id:
%d, ref: %p, range: %d-%d(%t)",
+ dataID, buf, i.From, i.To,
i.IsToBufferReadFinished)
+ }
+ e = next
+ }
+}
+
type Position struct {
// element of the event list
element *list.Element
@@ -143,6 +206,26 @@ func (r *Buffer) FindFirstDataBuffer(dataID uint64)
SocketDataBuffer {
return nil
}
+func (r *Buffer) BuildTotalDataIDRange() *DataIDRange {
+ if r.dataEvents.Len() == 0 {
+ return nil
+ }
+ var toIndex uint64
+ var isToBufferReadFinished bool
+ if r.endPosition != nil {
+ toIndex = r.endPosition.DataID()
+ isToBufferReadFinished = r.endPosition.bufIndex ==
r.endPosition.element.Value.(SocketDataBuffer).BufferLen()
+ } else {
+ toIndex = r.current.DataID()
+ isToBufferReadFinished = r.current.bufIndex ==
r.current.element.Value.(SocketDataBuffer).BufferLen()
+ }
+ return &DataIDRange{
+ From: r.head.DataID(),
+ To: toIndex,
+ IsToBufferReadFinished: isToBufferReadFinished,
+ }
+}
+
func (r *Buffer) Position() *Position {
return r.current.Clone()
}
@@ -155,6 +238,7 @@ func (r *Buffer) Clean() {
r.detailEvents = list.New()
r.head = nil
r.current = nil
+ r.endPosition = nil
}
func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
@@ -181,7 +265,7 @@ func (r *Buffer) Slice(validated bool, start, end
*Position) *Buffer {
dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer:
lastBuffer, Size: end.bufIndex})
// if the first detail element been found, append the details until the
last buffer data id
- if firstDetailElement == nil {
+ if firstDetailElement == nil && r.detailEvents != nil {
for e := r.detailEvents.Front(); e != nil; e = e.Next() {
if e.Value.(SocketDataDetail).DataID() ==
lastBuffer.DataID() {
detailEvents.PushBack(e.Value)
@@ -198,11 +282,13 @@ func (r *Buffer) Slice(validated bool, start, end
*Position) *Buffer {
}
return &Buffer{
- dataEvents: dataEvents,
- detailEvents: detailEvents,
- validated: validated,
- head: &Position{element: dataEvents.Front(), bufIndex:
start.bufIndex},
- current: &Position{element: dataEvents.Front(), bufIndex:
start.bufIndex},
+ dataEvents: dataEvents,
+ detailEvents: detailEvents,
+ validated: validated,
+ head: &Position{element: dataEvents.Front(),
bufIndex: start.bufIndex},
+ current: &Position{element: dataEvents.Front(),
bufIndex: start.bufIndex},
+ originalBuffer: r,
+ endPosition: end,
}
}
@@ -219,7 +305,36 @@ func (r *Buffer) Len() int {
return result
}
-func (r *Buffer) Details() *list.List {
+func (r *Buffer) BuildDetails() *list.List {
+ // if the original buffer is not empty, then query the details from
original buffer
+ if r.originalBuffer != nil {
+ events := list.New()
+ fromDataID := r.head.DataID()
+ var endDataID uint64
+ if r.endPosition != nil {
+ endDataID = r.endPosition.DataID()
+ } else {
+ endDataID = r.current.DataID()
+ }
+
+ for e := r.originalBuffer.detailEvents.Front(); e != nil; e =
e.Next() {
+ if e.Value.(SocketDataDetail).DataID() >= fromDataID &&
e.Value.(SocketDataDetail).DataID() <= endDataID {
+ events.PushBack(e.Value)
+ }
+ }
+ if events.Len() == 0 && log.Enable(logrus.DebugLevel) {
+ dataIDList := make([]uint64, 0)
+ for e := r.originalBuffer.detailEvents.Front(); e !=
nil; e = e.Next() {
+ if e.Value != nil {
+ dataIDList = append(dataIDList,
e.Value.(SocketDataDetail).DataID())
+ }
+ }
+ log.Debugf("cannot found details from original buffer,
from data id: %d, end data id: %d, "+
+ "ref: %p, existing details data id list: %v",
fromDataID, endDataID, r.originalBuffer, dataIDList)
+ }
+
+ return events
+ }
return r.detailEvents
}
@@ -281,7 +396,7 @@ func (r *Buffer) DetectNotSendingLastPosition() *Position {
return nil
}
-func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
+func CombineSlices(validated bool, originalBuffer *Buffer, buffers ...*Buffer)
*Buffer {
if len(buffers) == 0 {
return nil
}
@@ -289,7 +404,6 @@ func CombineSlices(validated bool, buffers ...*Buffer)
*Buffer {
return buffers[0]
}
dataEvents := list.New()
- detailEvents := list.New()
for _, b := range buffers {
if b == nil || b.head == nil {
continue
@@ -304,15 +418,20 @@ func CombineSlices(validated bool, buffers ...*Buffer)
*Buffer {
} else {
dataEvents.PushBackList(b.dataEvents)
}
- detailEvents.PushBackList(b.detailEvents)
}
+ var endPosition = buffers[len(buffers)-1].endPosition
+ if endPosition == nil {
+ endPosition = buffers[len(buffers)-1].Position()
+ }
return &Buffer{
- dataEvents: dataEvents,
- detailEvents: detailEvents,
- validated: validated,
- head: &Position{element: dataEvents.Front(), bufIndex:
0},
- current: &Position{element: dataEvents.Front(), bufIndex:
0},
+ dataEvents: dataEvents,
+ detailEvents: emptyList, // for the combined buffer, the
details list should be queried from original buffer
+ validated: validated,
+ head: &Position{element: dataEvents.Front(),
bufIndex: 0},
+ current: &Position{element: dataEvents.Front(),
bufIndex: 0},
+ originalBuffer: originalBuffer,
+ endPosition: endPosition,
}
}
@@ -505,12 +624,12 @@ func (r *Buffer) PrepareForReading() bool {
}
// nolint
-func (r *Buffer) RemoveReadElements() bool {
+func (r *Buffer) RemoveReadElements(includeDetails bool) bool {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
// delete until the last data id
- if r.head.element != nil && r.current.element != nil {
+ if includeDetails && r.head.element != nil && r.current.element != nil {
firstDataID := r.head.element.Value.(SocketDataBuffer).DataID()
currentBuffer := r.current.element.Value.(SocketDataBuffer)
lastDataID := currentBuffer.DataID()
@@ -531,6 +650,7 @@ func (r *Buffer) RemoveReadElements() bool {
if startDelete {
tmp := e.Next()
r.detailEvents.Remove(e)
+ log.Debugf("delete detail event from readed
buffer, data id: %d, ref: %p", event.DataID(), r)
e = tmp
} else {
e = e.Next()
@@ -588,13 +708,20 @@ func (r *Buffer) AppendDetailEvent(event
SocketDataDetail) {
r.detailEvents.PushFront(event)
return
}
+ if r.detailEvents.Back().Value == nil {
+ r.detailEvents.PushFront(event)
+ return
+ }
if r.detailEvents.Back().Value.(SocketDataDetail).DataID() <
event.DataID() {
r.detailEvents.PushBack(event)
return
}
beenAdded := false
for element := r.detailEvents.Front(); element != nil; element =
element.Next() {
- existEvent := element.Value.(SocketDataDetail)
+ existEvent, ok := element.Value.(SocketDataDetail)
+ if !ok {
+ continue
+ }
if existEvent.DataID() > event.DataID() {
// data id needs order
beenAdded = true
@@ -660,7 +787,16 @@ func (r *Buffer) DeleteExpireEvents(expireDuration
time.Duration) int {
// detail event queue
count += r.deleteEventsWithJudgement(r.detailEvents, func(element
*list.Element) bool {
- return r.latestExpiredDataID > 0 &&
element.Value.(SocketDataDetail).DataID() <= r.latestExpiredDataID
+ detail, ok := element.Value.(SocketDataDetail)
+ if !ok {
+ return true
+ }
+ isDelete := r.latestExpiredDataID > 0 && detail.DataID() <=
r.latestExpiredDataID ||
+ (detail.Time() > 0 &&
expireTime.After(host.Time(detail.Time())))
+ if isDelete {
+ log.Debugf("delete expired detail event, data id: %d,
buf: %p", detail.DataID(), r)
+ }
+ return isDelete
})
return count
}
@@ -672,13 +808,6 @@ func (r *Buffer) DataLength() int {
return r.dataEvents.Len()
}
-func (r *Buffer) DetailLength() int {
- if r.detailEvents == nil {
- return 0
- }
- return r.detailEvents.Len()
-}
-
func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element
*list.Element) bool) int {
count := 0
for e := l.Front(); e != nil; {