This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 67622c3  Support parallel parsing protocol data in the access log 
module (#167)
67622c3 is described below

commit 67622c352b98fd32782a3e7afc7d3fbd6d6ec8e3
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 11 12:08:44 2024 +0900

    Support parallel parsing protocol data in the access log module (#167)
---
 CHANGES.md                                         |  1 +
 configs/rover_configs.yaml                         |  4 ++-
 pkg/accesslog/collector/connection.go              |  4 +--
 pkg/accesslog/collector/protocols/queue.go         | 31 +++++++++++++---------
 pkg/accesslog/common/config.go                     |  3 ++-
 .../continuous/checker/bpf/network/network.go      |  2 +-
 .../task/network/analyze/layer7/events.go          |  4 +--
 pkg/tools/btf/linker.go                            | 15 +++++++++--
 pkg/tools/btf/queue.go                             |  6 +++--
 9 files changed, 46 insertions(+), 24 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 4eab947..3392d50 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 * Fix the unaligned memory accesses for `upload_socket_data_buf`.
 * Support for connecting to the backend server over TLS without requiring 
`ca.pem`.
 * Fix missing the first socket detail event in HTTPS protocol.
+* Support parallel parsing protocol data in the access log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml
index ce39111..8f7ba6e 100644
--- a/configs/rover_configs.yaml
+++ b/configs/rover_configs.yaml
@@ -150,8 +150,10 @@ access_log:
   protocol_analyze:
     # The size of socket data buffer on each CPU
     per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB}
+    # The count of parallel protocol event parse
+    parse_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARSE_PARALLELS:2}
     # The count of parallel protocol analyzer
-    parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2}
+    analyze_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2}
     # The size of per paralleled analyzer queue
     queue_size: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_QUEUE_SIZE:5000}
 
diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index c125397..8e8aba1 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -77,12 +77,12 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, 
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
                return newConnectionPartitionContext(ctx, track)
        })
-       c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize), func() interface{} {
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize), 1, func() interface{} {
                return &events.SocketConnectEvent{}
        }, func(data interface{}) string {
                return fmt.Sprintf("%d", 
data.(*events.SocketConnectEvent).ConID)
        })
-       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), func() interface{} {
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), 1, func() interface{} {
                return &events.SocketCloseEvent{}
        }, func(data interface{}) string {
                return fmt.Sprintf("%d", 
data.(*events.SocketCloseEvent).ConnectionID)
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index ad66584..68708fc 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -62,8 +62,11 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
        if int(perCPUBufferSize) < os.Getpagesize() {
                return nil, fmt.Errorf("the cpu buffer must bigger than %dB", 
os.Getpagesize())
        }
-       if ctx.Config.ProtocolAnalyze.Parallels < 1 {
-               return nil, fmt.Errorf("the parallels cannot be small than 1")
+       if ctx.Config.ProtocolAnalyze.AnalyzeParallels < 1 {
+               return nil, fmt.Errorf("the analyze parallels cannot be small 
than 1")
+       }
+       if ctx.Config.ProtocolAnalyze.ParseParallels < 1 {
+               return nil, fmt.Errorf("the parse parallels cannot be small 
than 1")
        }
        if ctx.Config.ProtocolAnalyze.QueueSize < 1 {
                return nil, fmt.Errorf("the queue size be small than 1")
@@ -85,20 +88,22 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) 
(*AnalyzeQueue, error) {
 }
 
 func (q *AnalyzeQueue) Start(ctx context.Context) {
-       q.eventQueue = 
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.Parallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
+       q.eventQueue = 
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, 
q.context.Config.ProtocolAnalyze.QueueSize,
                func(num int) btf.PartitionContext {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, 
int(q.perCPUBuffer), func() interface{} {
-               return q.detailSupplier()
-       }, func(data interface{}) string {
-               return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
-       })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, 
int(q.perCPUBuffer), func() interface{} {
-               return &events.SocketDataUploadEvent{}
-       }, func(data interface{}) string {
-               return fmt.Sprintf("%d", 
data.(*events.SocketDataUploadEvent).ConnectionID)
-       })
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, 
int(q.perCPUBuffer),
+               q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
+                       return q.detailSupplier()
+               }, func(data interface{}) string {
+                       return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
+               })
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, 
int(q.perCPUBuffer),
+               q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
+                       return &events.SocketDataUploadEvent{}
+               }, func(data interface{}) string {
+                       return fmt.Sprintf("%d", 
data.(*events.SocketDataUploadEvent).ConnectionID)
+               })
 
        q.eventQueue.Start(ctx, q.context.BPF.Linker)
 }
diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go
index f1aab94..1ef0aeb 100644
--- a/pkg/accesslog/common/config.go
+++ b/pkg/accesslog/common/config.go
@@ -43,7 +43,8 @@ type ConnectionAnalyzeConfig struct {
 
 type ProtocolAnalyzeConfig struct {
        PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
-       Parallels        int    `mapstructure:"parallels"`
+       ParseParallels   int    `mapstructure:"parse_parallels"`
+       AnalyzeParallels int    `mapstructure:"analyze_parallels"`
        QueueSize        int    `mapstructure:"queue_size"`
 }
 
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go 
b/pkg/profiling/continuous/checker/bpf/network/network.go
index be6300c..ba03f21 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -147,7 +147,7 @@ func startBPFIfNeed() error {
                        n.ReceiveBufferEvent(event)
                }
        })
-       bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, 
reader.Read, os.Getpagesize()*100, reader.BufferDataBPFSupplier)
+       bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, 
reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
 
        if err := bpfLinker.HasError(); err != nil {
                _ = bpfLinker.Close()
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index 2a54579..dab74d1 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -36,14 +36,14 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize 
int, config *profili
 
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) 
{
        // socket buffer data
-       
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, 
l.protocolPerCPUBuffer, func() interface{} {
+       
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDataUploadEvent{}
        }, func(data interface{}) string {
                return 
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
        })
 
        // socket detail
-       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer, func() interface{} {
+       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDetailEvent{}
        }, func(data interface{}) string {
                return 
data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 148222a..f663737 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -156,17 +156,28 @@ func (m *Linker) AddTracePoint(sys, name string, p 
*ebpf.Program) {
 }
 
 func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, 
dataSupplier func() interface{}) {
-       m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 
dataSupplier)
+       m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1, 
dataSupplier)
 }
 
-func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer int, dataSupplier func() interface{}) {
+func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
+       parallels int, dataSupplier func() interface{}) {
        rd, err := perf.NewReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
                return
        }
+       if parallels < 1 {
+               m.errors = multierror.Append(m.errors, fmt.Errorf("parallels 
rading count must bigger than 1"))
+               return
+       }
        m.closers = append(m.closers, rd)
 
+       for i := 0; i < parallels; i++ {
+               m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+       }
+}
+
+func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
        go func() {
                for {
                        record, err := rd.Read()
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index dc0b7cb..813979d 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -43,6 +43,7 @@ type mapReceiver struct {
        perCPUBuffer int
        dataSupplier func() interface{}
        router       func(data interface{}) string
+       parallels    int
 }
 
 func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator 
func(partitionNum int) PartitionContext) *EventQueue {
@@ -53,13 +54,14 @@ func NewEventQueue(partitionCount, sizePerPartition int, 
contextGenerator func(p
        return &EventQueue{count: partitionCount, partitions: partitions}
 }
 
-func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize int, 
dataSupplier func() interface{},
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, 
parallels int, dataSupplier func() interface{},
        routeGenerator func(data interface{}) string) {
        e.receivers = append(e.receivers, &mapReceiver{
                emap:         emap,
                perCPUBuffer: perCPUBufferSize,
                dataSupplier: dataSupplier,
                router:       routeGenerator,
+               parallels:    parallels,
        })
 }
 
@@ -92,7 +94,7 @@ func (e *EventQueue) start0(ctx context.Context, linker 
*Linker) {
                func(receiver *mapReceiver) {
                        linker.ReadEventAsyncWithBufferSize(receiver.emap, 
func(data interface{}) {
                                e.routerTransformer(data, receiver.router)
-                       }, receiver.perCPUBuffer, receiver.dataSupplier)
+                       }, receiver.perCPUBuffer, r.parallels, 
receiver.dataSupplier)
                }(r)
        }
 

Reply via email to