This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d72fa4  Refactor the socket detail message in the Access log module 
(#154)
5d72fa4 is described below

commit 5d72fa492fd3a366a63d3b7bd8b5a958caa8c405
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 11 21:34:20 2024 +0900

    Refactor the socket detail message in the Access log module (#154)
---
 pkg/accesslog/collector/protocols/http1.go    |  6 ++--
 pkg/accesslog/collector/protocols/http2.go    | 10 +++---
 pkg/accesslog/collector/protocols/protocol.go |  4 +--
 pkg/accesslog/collector/protocols/queue.go    | 29 ++++++++++++------
 pkg/accesslog/common/connection.go            |  8 ++---
 pkg/accesslog/common/queue.go                 |  4 +--
 pkg/accesslog/events/detail.go                | 44 +++++++++++++++++++++++++++
 pkg/accesslog/forwarder/close.go              |  2 +-
 pkg/accesslog/forwarder/connect.go            |  2 +-
 pkg/accesslog/forwarder/forwarder.go          |  2 +-
 pkg/accesslog/forwarder/protocol.go           |  2 +-
 pkg/accesslog/forwarder/transfer.go           |  4 +--
 12 files changed, 85 insertions(+), 32 deletions(-)

diff --git a/pkg/accesslog/collector/protocols/http1.go 
b/pkg/accesslog/collector/protocols/http1.go
index 11562e8..13b8661 100644
--- a/pkg/accesslog/collector/protocols/http1.go
+++ b/pkg/accesslog/collector/protocols/http1.go
@@ -143,7 +143,7 @@ func (p *HTTP1Protocol) handleResponse(metrics 
ProtocolMetrics, b *buffer.Buffer
 }
 
 func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request 
*reader.Request, response *reader.Response) {
-       detailEvents := make([]*events.SocketDetailEvent, 0)
+       detailEvents := make([]events.SocketDetail, 0)
        detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
request.HeaderBuffer())
        detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
request.BodyBuffer())
        detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
response.HeaderBuffer())
@@ -166,8 +166,8 @@ func (p *HTTP1Protocol) handleHTTPData(metrics 
*HTTP1Metrics, request *reader.Re
        forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(detailEvents[0].StartTime),
-                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].EndTime),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
+                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP1,
                                Request: &v3.AccessLogHTTPProtocolRequest{
                                        Method:             
transformHTTPMethod(originalRequest.Method),
diff --git a/pkg/accesslog/collector/protocols/http2.go 
b/pkg/accesslog/collector/protocols/http2.go
index 8ce8d26..a3bf80e 100644
--- a/pkg/accesslog/collector/protocols/http2.go
+++ b/pkg/accesslog/collector/protocols/http2.go
@@ -224,7 +224,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics 
*HTTP2Metrics, id ui
 }
 
 func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
-       detailEvents := make([]*events.SocketDetailEvent, 0)
+       detailEvents := make([]events.SocketDetail, 0)
        detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.reqHeaderBuffer)
        detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.reqBodyBuffer)
        detailEvents = appendSocketDetailsFromBuffer(detailEvents, 
stream.respHeaderBuffer)
@@ -239,8 +239,8 @@ func (r *HTTP2Protocol) handleWholeStream(stream 
*HTTP2Streaming) {
        forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, 
&v3.AccessLogProtocolLogs{
                Protocol: &v3.AccessLogProtocolLogs_Http{
                        Http: &v3.AccessLogHTTPProtocol{
-                               StartTime: 
forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer, 
detailEvents[0]).StartTime),
-                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].EndTime),
+                               StartTime: 
forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer, 
detailEvents[0]).GetStartTime()),
+                               EndTime:   
forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
                                Version:   
v3.AccessLogHTTPProtocolVersion_HTTP2,
                                Request: &v3.AccessLogHTTPProtocolRequest{
                                        Method:             
r.parseHTTPMethod(stream),
@@ -271,11 +271,11 @@ func (r *HTTP2Protocol) parseHTTPMethod(streaming 
*HTTP2Streaming) v3.AccessLogH
        return transformHTTPMethod(strings.ToUpper(method))
 }
 
-func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def 
*events.SocketDetailEvent) *events.SocketDetailEvent {
+func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def 
events.SocketDetail) events.SocketDetail {
        if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
                return def
        }
-       return buf.Details().Front().Value.(*events.SocketDetailEvent)
+       return buf.Details().Front().Value.(events.SocketDetail)
 }
 
 func (r *HTTP2Protocol) bufferSizeOfZero(buf *buffer.Buffer) uint64 {
diff --git a/pkg/accesslog/collector/protocols/protocol.go 
b/pkg/accesslog/collector/protocols/protocol.go
index 25c5ba7..17ff30c 100644
--- a/pkg/accesslog/collector/protocols/protocol.go
+++ b/pkg/accesslog/collector/protocols/protocol.go
@@ -58,7 +58,7 @@ type Protocol interface {
        Analyze(metrics ProtocolMetrics, buffer *buffer.Buffer, helper 
*AnalyzeHelper) error
 }
 
-func appendSocketDetailsFromBuffer(result []*events.SocketDetailEvent, buf 
*buffer.Buffer) []*events.SocketDetailEvent {
+func appendSocketDetailsFromBuffer(result []events.SocketDetail, buf 
*buffer.Buffer) []events.SocketDetail {
        if buf == nil || buf.DetailLength() == 0 {
                return result
        }
@@ -66,7 +66,7 @@ func appendSocketDetailsFromBuffer(result 
[]*events.SocketDetailEvent, buf *buff
                if len(result) > 0 && result[len(result)-1] == e.Value {
                        continue
                }
-               result = append(result, e.Value.(*events.SocketDetailEvent))
+               result = append(result, e.Value.(events.SocketDetail))
        }
        return result
 }
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 000590d..70c7743 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -48,6 +48,8 @@ type AnalyzeQueue struct {
        context      *common.AccessLogContext
        eventQueue   *btf.EventQueue
        perCPUBuffer int64
+
+       detailSupplier func() events.SocketDetail
 }
 
 func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
@@ -71,14 +73,17 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
                eventQueue: 
btf.NewEventQueue(ctx.Config.ProtocolAnalyze.Parallels, 
ctx.Config.ProtocolAnalyze.QueueSize, func() btf.PartitionContext {
                        return NewPartitionContext(ctx)
                }),
+               detailSupplier: func() events.SocketDetail {
+                       return &events.SocketDetailEvent{}
+               },
        }, nil
 }
 
 func (q *AnalyzeQueue) Start(ctx context.Context) {
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, 
int(q.perCPUBuffer), func() interface{} {
-               return &events.SocketDetailEvent{}
+               return q.detailSupplier()
        }, func(data interface{}) string {
-               return fmt.Sprintf("%d", 
data.(*events.SocketDetailEvent).GetConnectionID())
+               return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
        })
        q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, 
int(q.perCPUBuffer), func() interface{} {
                return &events.SocketDataUploadEvent{}
@@ -89,6 +94,10 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
        q.eventQueue.Start(ctx, q.context.BPF.Linker)
 }
 
+func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func() 
events.SocketDetail) {
+       q.detailSupplier = supplier
+}
+
 type PartitionContext struct {
        context     *common.AccessLogContext
        protocolMgr *ProtocolManager
@@ -164,18 +173,18 @@ func (p *PartitionContext) Start(ctx context.Context) {
 
 func (p *PartitionContext) Consume(data interface{}) {
        switch event := data.(type) {
-       case *events.SocketDetailEvent:
-               pid, _ := events.ParseConnectionID(event.ConnectionID)
+       case events.SocketDetail:
+               pid, _ := events.ParseConnectionID(event.GetConnectionID())
                log.Debugf("receive the socket detail event, connection ID: %d, 
random ID: %d, pid: %d, data id: %d, "+
-                       "function name: %s, package count: %d, package size: 
%d, l4 duration: %d, ssl: %d",
-                       event.ConnectionID, event.RandomID, pid, event.DataID0, 
event.FunctionName,
-                       event.L4PackageCount, event.L4TotalPackageSize, 
event.L4Duration, event.SSL)
-               if event.Protocol == enums.ConnectionProtocolUnknown {
+                       "function name: %s, package count: %d, package size: 
%d, ssl: %d",
+                       event.GetConnectionID(), event.GetRandomID(), pid, 
event.DataID(), event.GetFunctionName(),
+                       event.GetL4PackageCount(), 
event.GetL4TotalPackageSize(), event.GetSSL())
+               if event.GetProtocol() == enums.ConnectionProtocolUnknown {
                        // if the connection protocol is unknown, we just needs 
to add this into the kernel log
                        forwarder.SendTransferNoProtocolEvent(p.context, event)
                        return
                }
-               connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.Protocol)
+               connection := p.getConnectionContext(event.GetConnectionID(), 
event.GetRandomID(), event.GetProtocol())
                connection.appendDetail(p.context, event)
        case *events.SocketDataUploadEvent:
                pid, _ := events.ParseConnectionID(event.ConnectionID)
@@ -303,7 +312,7 @@ type PartitionConnection struct {
        lastCheckCloseTime     time.Time
 }
 
-func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, 
detail *events.SocketDetailEvent) {
+func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, 
detail events.SocketDetail) {
        if p.skipAllDataAnalyze {
                // if the connection is already skip all data analyze, then 
just send the detail event
                forwarder.SendTransferNoProtocolEvent(ctx, detail)
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 51d418a..3681f95 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -313,12 +313,12 @@ func (c *ConnectionManager) 
connectionPostHandle(connection *ConnectionInfo, eve
                        // if not all processor finished, then add into the map
                        c.allUnfinishedConnections[fmt.Sprintf("%d_%d", 
event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
                }
-       case *events.SocketDetailEvent:
-               if e.SSL == 1 && connection.RPCConnection.TlsMode == 
v3.AccessLogConnectionTLSMode_Plain {
+       case events.SocketDetail:
+               if e.GetSSL() == 1 && connection.RPCConnection.TlsMode == 
v3.AccessLogConnectionTLSMode_Plain {
                        connection.RPCConnection.TlsMode = 
v3.AccessLogConnectionTLSMode_TLS
                }
-               if e.Protocol != enums.ConnectionProtocolUnknown && 
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
-                       switch e.Protocol {
+               if e.GetProtocol() != enums.ConnectionProtocolUnknown && 
connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
+                       switch e.GetProtocol() {
                        case enums.ConnectionProtocolHTTP:
                                connection.RPCConnection.Protocol = 
v3.AccessLogProtocolType_HTTP_1
                        case enums.ConnectionProtocolHTTP2:
diff --git a/pkg/accesslog/common/queue.go b/pkg/accesslog/common/queue.go
index 6a0fc6f..131a29a 100644
--- a/pkg/accesslog/common/queue.go
+++ b/pkg/accesslog/common/queue.go
@@ -39,7 +39,7 @@ type KernelLog struct {
 }
 
 type ProtocolLog struct {
-       KernelLogs []*events.SocketDetailEvent
+       KernelLogs []events.SocketDetail
        Protocol   *v3.AccessLogProtocolLogs
 }
 
@@ -84,7 +84,7 @@ func (q *Queue) AppendKernelLog(tp LogType, event 
events.Event) {
        q.consumeIfNeed()
 }
 
-func (q *Queue) AppendProtocolLog(kernelLogs []*events.SocketDetailEvent, 
protocol *v3.AccessLogProtocolLogs) {
+func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol 
*v3.AccessLogProtocolLogs) {
        select {
        case q.protocolLogs <- &ProtocolLog{
                KernelLogs: kernelLogs,
diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go
index ecb5f11..0f56edf 100644
--- a/pkg/accesslog/events/detail.go
+++ b/pkg/accesslog/events/detail.go
@@ -20,10 +20,26 @@ package events
 import (
        "time"
 
+       "github.com/apache/skywalking-rover/pkg/tools/buffer"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
        "github.com/apache/skywalking-rover/pkg/tools/host"
 )
 
+type SocketDetail interface {
+       Event
+       buffer.SocketDataDetail
+
+       GetStartTime() uint64
+       GetEndTime() uint64
+
+       GetL4PackageCount() uint8
+       GetL4TotalPackageSize() uint64
+
+       GetFunctionName() enums.SocketFunctionName
+       GetProtocol() enums.ConnectionProtocol
+       GetSSL() uint8
+}
+
 type SocketDetailEvent struct {
        ConnectionID uint64
        RandomID     uint64
@@ -71,3 +87,31 @@ func (d *SocketDetailEvent) Timestamp() time.Time {
 func (d *SocketDetailEvent) DataID() uint64 {
        return d.DataID0
 }
+
+func (d *SocketDetailEvent) GetStartTime() uint64 {
+       return d.StartTime
+}
+
+func (d *SocketDetailEvent) GetEndTime() uint64 {
+       return d.EndTime
+}
+
+func (d *SocketDetailEvent) GetL4PackageCount() uint8 {
+       return d.L4PackageCount
+}
+
+func (d *SocketDetailEvent) GetL4TotalPackageSize() uint64 {
+       return d.L4TotalPackageSize
+}
+
+func (d *SocketDetailEvent) GetFunctionName() enums.SocketFunctionName {
+       return d.FunctionName
+}
+
+func (d *SocketDetailEvent) GetProtocol() enums.ConnectionProtocol {
+       return d.Protocol
+}
+
+func (d *SocketDetailEvent) GetSSL() uint8 {
+       return d.SSL
+}
diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go
index 5afbec3..d949588 100644
--- a/pkg/accesslog/forwarder/close.go
+++ b/pkg/accesslog/forwarder/close.go
@@ -25,7 +25,7 @@ import (
 )
 
 func init() {
-       registerKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
+       RegisterKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
 }
 
 func SendCloseEvent(context *common.AccessLogContext, event 
*common.CloseEventWithNotify) {
diff --git a/pkg/accesslog/forwarder/connect.go 
b/pkg/accesslog/forwarder/connect.go
index e57ac4e..2058d25 100644
--- a/pkg/accesslog/forwarder/connect.go
+++ b/pkg/accesslog/forwarder/connect.go
@@ -27,7 +27,7 @@ import (
 )
 
 func init() {
-       registerKernelLogBuilder(common.LogTypeConnect, connectLogBuilder)
+       RegisterKernelLogBuilder(common.LogTypeConnect, connectLogBuilder)
 }
 
 func SendConnectEvent(context *common.AccessLogContext, event 
*events.SocketConnectEvent, socketPair *ip.SocketPair) {
diff --git a/pkg/accesslog/forwarder/forwarder.go 
b/pkg/accesslog/forwarder/forwarder.go
index 3cac342..fc9b2bb 100644
--- a/pkg/accesslog/forwarder/forwarder.go
+++ b/pkg/accesslog/forwarder/forwarder.go
@@ -28,7 +28,7 @@ type KernelLogBuilder func(data events.Event) 
*v3.AccessLogKernelLog
 
 var kernelLogBuilders = make([]KernelLogBuilder, 10)
 
-func registerKernelLogBuilder(tp common.LogType, builder KernelLogBuilder) {
+func RegisterKernelLogBuilder(tp common.LogType, builder KernelLogBuilder) {
        kernelLogBuilders[tp] = builder
 }
 
diff --git a/pkg/accesslog/forwarder/protocol.go 
b/pkg/accesslog/forwarder/protocol.go
index dd158c8..8e8c5b4 100644
--- a/pkg/accesslog/forwarder/protocol.go
+++ b/pkg/accesslog/forwarder/protocol.go
@@ -24,6 +24,6 @@ import (
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
 
-func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs 
[]*events.SocketDetailEvent, protocolData *v3.AccessLogProtocolLogs) {
+func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs 
[]events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) {
        context.Queue.AppendProtocolLog(kernelLogs, protocolData)
 }
diff --git a/pkg/accesslog/forwarder/transfer.go 
b/pkg/accesslog/forwarder/transfer.go
index 882da0c..44c0d4e 100644
--- a/pkg/accesslog/forwarder/transfer.go
+++ b/pkg/accesslog/forwarder/transfer.go
@@ -26,10 +26,10 @@ import (
 )
 
 func init() {
-       registerKernelLogBuilder(common.LogTypeKernelTransfer, 
kernelTransferLogBuilder)
+       RegisterKernelLogBuilder(common.LogTypeKernelTransfer, 
kernelTransferLogBuilder)
 }
 
-func SendTransferNoProtocolEvent(context *common.AccessLogContext, event 
*events.SocketDetailEvent) {
+func SendTransferNoProtocolEvent(context *common.AccessLogContext, event 
events.SocketDetail) {
        context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event)
 }
 

Reply via email to