This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 5d72fa4 Refactor the socket detail message in the Access log module
(#154)
5d72fa4 is described below
commit 5d72fa492fd3a366a63d3b7bd8b5a958caa8c405
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 11 21:34:20 2024 +0900
Refactor the socket detail message in the Access log module (#154)
---
pkg/accesslog/collector/protocols/http1.go | 6 ++--
pkg/accesslog/collector/protocols/http2.go | 10 +++---
pkg/accesslog/collector/protocols/protocol.go | 4 +--
pkg/accesslog/collector/protocols/queue.go | 29 ++++++++++++------
pkg/accesslog/common/connection.go | 8 ++---
pkg/accesslog/common/queue.go | 4 +--
pkg/accesslog/events/detail.go | 44 +++++++++++++++++++++++++++
pkg/accesslog/forwarder/close.go | 2 +-
pkg/accesslog/forwarder/connect.go | 2 +-
pkg/accesslog/forwarder/forwarder.go | 2 +-
pkg/accesslog/forwarder/protocol.go | 2 +-
pkg/accesslog/forwarder/transfer.go | 4 +--
12 files changed, 85 insertions(+), 32 deletions(-)
diff --git a/pkg/accesslog/collector/protocols/http1.go
b/pkg/accesslog/collector/protocols/http1.go
index 11562e8..13b8661 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -143,7 +143,7 @@ func (p *HTTP1Protocol) handleResponse(metrics
ProtocolMetrics, b *buffer.Buffer
}
func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request
*reader.Request, response *reader.Response) {
- detailEvents := make([]*events.SocketDetailEvent, 0)
+ detailEvents := make([]events.SocketDetail, 0)
detailEvents = appendSocketDetailsFromBuffer(detailEvents,
request.HeaderBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents,
request.BodyBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents,
response.HeaderBuffer())
@@ -166,8 +166,8 @@ func (p *HTTP1Protocol) handleHTTPData(metrics
*HTTP1Metrics, request *reader.Re
forwarder.SendTransferProtocolEvent(p.ctx, detailEvents,
&v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
- StartTime:
forwarder.BuildOffsetTimestamp(detailEvents[0].StartTime),
- EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].EndTime),
+ StartTime:
forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
+ EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version:
v3.AccessLogHTTPProtocolVersion_HTTP1,
Request: &v3.AccessLogHTTPProtocolRequest{
Method:
transformHTTPMethod(originalRequest.Method),
diff --git a/pkg/accesslog/collector/protocols/http2.go
b/pkg/accesslog/collector/protocols/http2.go
index 8ce8d26..a3bf80e 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -224,7 +224,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics
*HTTP2Metrics, id ui
}
func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
- detailEvents := make([]*events.SocketDetailEvent, 0)
+ detailEvents := make([]events.SocketDetail, 0)
detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.reqHeaderBuffer)
detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.reqBodyBuffer)
detailEvents = appendSocketDetailsFromBuffer(detailEvents,
stream.respHeaderBuffer)
@@ -239,8 +239,8 @@ func (r *HTTP2Protocol) handleWholeStream(stream
*HTTP2Streaming) {
forwarder.SendTransferProtocolEvent(r.ctx, detailEvents,
&v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
- StartTime:
forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer,
detailEvents[0]).StartTime),
- EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].EndTime),
+ StartTime:
forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer,
detailEvents[0]).GetStartTime()),
+ EndTime:
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version:
v3.AccessLogHTTPProtocolVersion_HTTP2,
Request: &v3.AccessLogHTTPProtocolRequest{
Method:
r.parseHTTPMethod(stream),
@@ -271,11 +271,11 @@ func (r *HTTP2Protocol) parseHTTPMethod(streaming
*HTTP2Streaming) v3.AccessLogH
return transformHTTPMethod(strings.ToUpper(method))
}
-func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def
*events.SocketDetailEvent) *events.SocketDetailEvent {
+func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def
events.SocketDetail) events.SocketDetail {
if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
return def
}
- return buf.Details().Front().Value.(*events.SocketDetailEvent)
+ return buf.Details().Front().Value.(events.SocketDetail)
}
func (r *HTTP2Protocol) bufferSizeOfZero(buf *buffer.Buffer) uint64 {
diff --git a/pkg/accesslog/collector/protocols/protocol.go
b/pkg/accesslog/collector/protocols/protocol.go
index 25c5ba7..17ff30c 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -58,7 +58,7 @@ type Protocol interface {
Analyze(metrics ProtocolMetrics, buffer *buffer.Buffer, helper
*AnalyzeHelper) error
}
-func appendSocketDetailsFromBuffer(result []*events.SocketDetailEvent, buf
*buffer.Buffer) []*events.SocketDetailEvent {
+func appendSocketDetailsFromBuffer(result []events.SocketDetail, buf
*buffer.Buffer) []events.SocketDetail {
if buf == nil || buf.DetailLength() == 0 {
return result
}
@@ -66,7 +66,7 @@ func appendSocketDetailsFromBuffer(result
[]*events.SocketDetailEvent, buf *buff
if len(result) > 0 && result[len(result)-1] == e.Value {
continue
}
- result = append(result, e.Value.(*events.SocketDetailEvent))
+ result = append(result, e.Value.(events.SocketDetail))
}
return result
}
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 000590d..70c7743 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -48,6 +48,8 @@ type AnalyzeQueue struct {
context *common.AccessLogContext
eventQueue *btf.EventQueue
perCPUBuffer int64
+
+ detailSupplier func() events.SocketDetail
}
func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
@@ -71,14 +73,17 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
eventQueue:
btf.NewEventQueue(ctx.Config.ProtocolAnalyze.Parallels,
ctx.Config.ProtocolAnalyze.QueueSize, func() btf.PartitionContext {
return NewPartitionContext(ctx)
}),
+ detailSupplier: func() events.SocketDetail {
+ return &events.SocketDetailEvent{}
+ },
}, nil
}
func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue,
int(q.perCPUBuffer), func() interface{} {
- return &events.SocketDetailEvent{}
+ return q.detailSupplier()
}, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketDetailEvent).GetConnectionID())
+ return fmt.Sprintf("%d",
data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue,
int(q.perCPUBuffer), func() interface{} {
return &events.SocketDataUploadEvent{}
@@ -89,6 +94,10 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.Start(ctx, q.context.BPF.Linker)
}
+func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func()
events.SocketDetail) {
+ q.detailSupplier = supplier
+}
+
type PartitionContext struct {
context *common.AccessLogContext
protocolMgr *ProtocolManager
@@ -164,18 +173,18 @@ func (p *PartitionContext) Start(ctx context.Context) {
func (p *PartitionContext) Consume(data interface{}) {
switch event := data.(type) {
- case *events.SocketDetailEvent:
- pid, _ := events.ParseConnectionID(event.ConnectionID)
+ case events.SocketDetail:
+ pid, _ := events.ParseConnectionID(event.GetConnectionID())
log.Debugf("receive the socket detail event, connection ID: %d,
random ID: %d, pid: %d, data id: %d, "+
- "function name: %s, package count: %d, package size:
%d, l4 duration: %d, ssl: %d",
- event.ConnectionID, event.RandomID, pid, event.DataID0,
event.FunctionName,
- event.L4PackageCount, event.L4TotalPackageSize,
event.L4Duration, event.SSL)
- if event.Protocol == enums.ConnectionProtocolUnknown {
+ "function name: %s, package count: %d, package size:
%d, ssl: %d",
+ event.GetConnectionID(), event.GetRandomID(), pid,
event.DataID(), event.GetFunctionName(),
+ event.GetL4PackageCount(),
event.GetL4TotalPackageSize(), event.GetSSL())
+ if event.GetProtocol() == enums.ConnectionProtocolUnknown {
// if the connection protocol is unknown, we just needs
to add this into the kernel log
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
- connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.Protocol)
+ connection := p.getConnectionContext(event.GetConnectionID(),
event.GetRandomID(), event.GetProtocol())
connection.appendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
@@ -303,7 +312,7 @@ type PartitionConnection struct {
lastCheckCloseTime time.Time
}
-func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext,
detail *events.SocketDetailEvent) {
+func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext,
detail events.SocketDetail) {
if p.skipAllDataAnalyze {
// if the connection is already skip all data analyze, then
just send the detail event
forwarder.SendTransferNoProtocolEvent(ctx, detail)
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 51d418a..3681f95 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -313,12 +313,12 @@ func (c *ConnectionManager)
connectionPostHandle(connection *ConnectionInfo, eve
// if not all processor finished, then add into the map
c.allUnfinishedConnections[fmt.Sprintf("%d_%d",
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
}
- case *events.SocketDetailEvent:
- if e.SSL == 1 && connection.RPCConnection.TlsMode ==
v3.AccessLogConnectionTLSMode_Plain {
+ case events.SocketDetail:
+ if e.GetSSL() == 1 && connection.RPCConnection.TlsMode ==
v3.AccessLogConnectionTLSMode_Plain {
connection.RPCConnection.TlsMode =
v3.AccessLogConnectionTLSMode_TLS
}
- if e.Protocol != enums.ConnectionProtocolUnknown &&
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
- switch e.Protocol {
+ if e.GetProtocol() != enums.ConnectionProtocolUnknown &&
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
+ switch e.GetProtocol() {
case enums.ConnectionProtocolHTTP:
connection.RPCConnection.Protocol =
v3.AccessLogProtocolType_HTTP_1
case enums.ConnectionProtocolHTTP2:
diff --git a/pkg/accesslog/common/queue.go b/pkg/accesslog/common/queue.go
index 6a0fc6f..131a29a 100644
--- a/pkg/accesslog/common/queue.go
+++ b/pkg/accesslog/common/queue.go
@@ -39,7 +39,7 @@ type KernelLog struct {
}
type ProtocolLog struct {
- KernelLogs []*events.SocketDetailEvent
+ KernelLogs []events.SocketDetail
Protocol *v3.AccessLogProtocolLogs
}
@@ -84,7 +84,7 @@ func (q *Queue) AppendKernelLog(tp LogType, event
events.Event) {
q.consumeIfNeed()
}
-func (q *Queue) AppendProtocolLog(kernelLogs []*events.SocketDetailEvent,
protocol *v3.AccessLogProtocolLogs) {
+func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol
*v3.AccessLogProtocolLogs) {
select {
case q.protocolLogs <- &ProtocolLog{
KernelLogs: kernelLogs,
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index ecb5f11..0f56edf 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -20,10 +20,26 @@ package events
import (
"time"
+ "github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
+type SocketDetail interface {
+ Event
+ buffer.SocketDataDetail
+
+ GetStartTime() uint64
+ GetEndTime() uint64
+
+ GetL4PackageCount() uint8
+ GetL4TotalPackageSize() uint64
+
+ GetFunctionName() enums.SocketFunctionName
+ GetProtocol() enums.ConnectionProtocol
+ GetSSL() uint8
+}
+
type SocketDetailEvent struct {
ConnectionID uint64
RandomID uint64
@@ -71,3 +87,31 @@ func (d *SocketDetailEvent) Timestamp() time.Time {
func (d *SocketDetailEvent) DataID() uint64 {
return d.DataID0
}
+
+func (d *SocketDetailEvent) GetStartTime() uint64 {
+ return d.StartTime
+}
+
+func (d *SocketDetailEvent) GetEndTime() uint64 {
+ return d.EndTime
+}
+
+func (d *SocketDetailEvent) GetL4PackageCount() uint8 {
+ return d.L4PackageCount
+}
+
+func (d *SocketDetailEvent) GetL4TotalPackageSize() uint64 {
+ return d.L4TotalPackageSize
+}
+
+func (d *SocketDetailEvent) GetFunctionName() enums.SocketFunctionName {
+ return d.FunctionName
+}
+
+func (d *SocketDetailEvent) GetProtocol() enums.ConnectionProtocol {
+ return d.Protocol
+}
+
+func (d *SocketDetailEvent) GetSSL() uint8 {
+ return d.SSL
+}
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go
index 5afbec3..d949588 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/forwarder/close.go
@@ -25,7 +25,7 @@ import (
)
func init() {
- registerKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
+ RegisterKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
}
func SendCloseEvent(context *common.AccessLogContext, event
*common.CloseEventWithNotify) {
diff --git a/pkg/accesslog/forwarder/connect.go
b/pkg/accesslog/forwarder/connect.go
index e57ac4e..2058d25 100644
--- a/pkg/accesslog/forwarder/connect.go
+++ b/pkg/accesslog/forwarder/connect.go
@@ -27,7 +27,7 @@ import (
)
func init() {
- registerKernelLogBuilder(common.LogTypeConnect, connectLogBuilder)
+ RegisterKernelLogBuilder(common.LogTypeConnect, connectLogBuilder)
}
func SendConnectEvent(context *common.AccessLogContext, event
*events.SocketConnectEvent, socketPair *ip.SocketPair) {
diff --git a/pkg/accesslog/forwarder/forwarder.go
b/pkg/accesslog/forwarder/forwarder.go
index 3cac342..fc9b2bb 100644
--- a/pkg/accesslog/forwarder/forwarder.go
+++ b/pkg/accesslog/forwarder/forwarder.go
@@ -28,7 +28,7 @@ type KernelLogBuilder func(data events.Event)
*v3.AccessLogKernelLog
var kernelLogBuilders = make([]KernelLogBuilder, 10)
-func registerKernelLogBuilder(tp common.LogType, builder KernelLogBuilder) {
+func RegisterKernelLogBuilder(tp common.LogType, builder KernelLogBuilder) {
kernelLogBuilders[tp] = builder
}
diff --git a/pkg/accesslog/forwarder/protocol.go
b/pkg/accesslog/forwarder/protocol.go
index dd158c8..8e8c5b4 100644
--- a/pkg/accesslog/forwarder/protocol.go
+++ b/pkg/accesslog/forwarder/protocol.go
@@ -24,6 +24,6 @@ import (
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
-func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs
[]*events.SocketDetailEvent, protocolData *v3.AccessLogProtocolLogs) {
+func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs
[]events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) {
context.Queue.AppendProtocolLog(kernelLogs, protocolData)
}
diff --git a/pkg/accesslog/forwarder/transfer.go
b/pkg/accesslog/forwarder/transfer.go
index 882da0c..44c0d4e 100644
--- a/pkg/accesslog/forwarder/transfer.go
+++ b/pkg/accesslog/forwarder/transfer.go
@@ -26,10 +26,10 @@ import (
)
func init() {
- registerKernelLogBuilder(common.LogTypeKernelTransfer,
kernelTransferLogBuilder)
+ RegisterKernelLogBuilder(common.LogTypeKernelTransfer,
kernelTransferLogBuilder)
}
-func SendTransferNoProtocolEvent(context *common.AccessLogContext, event
*events.SocketDetailEvent) {
+func SendTransferNoProtocolEvent(context *common.AccessLogContext, event
events.SocketDetail) {
context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event)
}