This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 67622c3 Support parallel parsing protocol data in the access log
module (#167)
67622c3 is described below
commit 67622c352b98fd32782a3e7afc7d3fbd6d6ec8e3
Author: mrproliu <[email protected]>
AuthorDate: Wed Dec 11 12:08:44 2024 +0900
Support parallel parsing protocol data in the access log module (#167)
---
CHANGES.md | 1 +
configs/rover_configs.yaml | 4 ++-
pkg/accesslog/collector/connection.go | 4 +--
pkg/accesslog/collector/protocols/queue.go | 31 +++++++++++++---------
pkg/accesslog/common/config.go | 3 ++-
.../continuous/checker/bpf/network/network.go | 2 +-
.../task/network/analyze/layer7/events.go | 4 +--
pkg/tools/btf/linker.go | 15 +++++++++--
pkg/tools/btf/queue.go | 6 +++--
9 files changed, 46 insertions(+), 24 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 4eab947..3392d50 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
* Fix the unaligned memory accesses for `upload_socket_data_buf`.
* Support for connecting to the backend server over TLS without requiring
`ca.pem`.
* Fix missing the first socket detail event in HTTPS protocol.
+* Support parallel parsing protocol data in the access log module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml
index ce39111..8f7ba6e 100644
--- a/configs/rover_configs.yaml
+++ b/configs/rover_configs.yaml
@@ -150,8 +150,10 @@ access_log:
protocol_analyze:
# The size of socket data buffer on each CPU
per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB}
+ # The count of parallel protocol event parse
+ parse_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARSE_PARALLELS:2}
# The count of parallel protocol analyzer
- parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2}
+ analyze_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2}
# The size of per paralleled analyzer queue
queue_size: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_QUEUE_SIZE:5000}
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index c125397..8e8aba1 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -77,12 +77,12 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
- c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize), func() interface{} {
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize), 1, func() interface{} {
return &events.SocketConnectEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
})
- c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), func() interface{} {
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), 1, func() interface{} {
return &events.SocketCloseEvent{}
}, func(data interface{}) string {
return fmt.Sprintf("%d",
data.(*events.SocketCloseEvent).ConnectionID)
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index ad66584..68708fc 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -62,8 +62,11 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
if int(perCPUBufferSize) < os.Getpagesize() {
return nil, fmt.Errorf("the cpu buffer must bigger than %dB",
os.Getpagesize())
}
- if ctx.Config.ProtocolAnalyze.Parallels < 1 {
- return nil, fmt.Errorf("the parallels cannot be small than 1")
+ if ctx.Config.ProtocolAnalyze.AnalyzeParallels < 1 {
+ return nil, fmt.Errorf("the analyze parallels cannot be small
than 1")
+ }
+ if ctx.Config.ProtocolAnalyze.ParseParallels < 1 {
+ return nil, fmt.Errorf("the parse parallels cannot be small
than 1")
}
if ctx.Config.ProtocolAnalyze.QueueSize < 1 {
return nil, fmt.Errorf("the queue size be small than 1")
@@ -85,20 +88,22 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext)
(*AnalyzeQueue, error) {
}
func (q *AnalyzeQueue) Start(ctx context.Context) {
- q.eventQueue =
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.Parallels,
q.context.Config.ProtocolAnalyze.QueueSize,
+ q.eventQueue =
btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels,
q.context.Config.ProtocolAnalyze.QueueSize,
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
- q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue,
int(q.perCPUBuffer), func() interface{} {
- return q.detailSupplier()
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(events.SocketDetail).GetConnectionID())
- })
- q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue,
int(q.perCPUBuffer), func() interface{} {
- return &events.SocketDataUploadEvent{}
- }, func(data interface{}) string {
- return fmt.Sprintf("%d",
data.(*events.SocketDataUploadEvent).ConnectionID)
- })
+ q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue,
int(q.perCPUBuffer),
+ q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
+ return q.detailSupplier()
+ }, func(data interface{}) string {
+ return fmt.Sprintf("%d",
data.(events.SocketDetail).GetConnectionID())
+ })
+ q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue,
int(q.perCPUBuffer),
+ q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
+ return &events.SocketDataUploadEvent{}
+ }, func(data interface{}) string {
+ return fmt.Sprintf("%d",
data.(*events.SocketDataUploadEvent).ConnectionID)
+ })
q.eventQueue.Start(ctx, q.context.BPF.Linker)
}
diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go
index f1aab94..1ef0aeb 100644
--- a/pkg/accesslog/common/config.go
+++ b/pkg/accesslog/common/config.go
@@ -43,7 +43,8 @@ type ConnectionAnalyzeConfig struct {
type ProtocolAnalyzeConfig struct {
PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
- Parallels int `mapstructure:"parallels"`
+ ParseParallels int `mapstructure:"parse_parallels"`
+ AnalyzeParallels int `mapstructure:"analyze_parallels"`
QueueSize int `mapstructure:"queue_size"`
}
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go
b/pkg/profiling/continuous/checker/bpf/network/network.go
index be6300c..ba03f21 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -147,7 +147,7 @@ func startBPFIfNeed() error {
n.ReceiveBufferEvent(event)
}
})
- bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue,
reader.Read, os.Getpagesize()*100, reader.BufferDataBPFSupplier)
+ bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue,
reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
if err := bpfLinker.HasError(); err != nil {
_ = bpfLinker.Close()
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 2a54579..dab74d1 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -36,14 +36,14 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
// socket buffer data
-
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue,
l.protocolPerCPUBuffer, func() interface{} {
+
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDataUploadEvent{}
}, func(data interface{}) string {
return
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
})
// socket detail
- l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, func() interface{} {
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDetailEvent{}
}, func(data interface{}) string {
return
data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index 148222a..f663737 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -156,17 +156,28 @@ func (m *Linker) AddTracePoint(sys, name string, p
*ebpf.Program) {
}
func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader,
dataSupplier func() interface{}) {
- m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(),
dataSupplier)
+ m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1,
dataSupplier)
}
-func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer int, dataSupplier func() interface{}) {
+func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer,
+ parallels int, dataSupplier func() interface{}) {
rd, err := perf.NewReader(emap, perCPUBuffer)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring
buffer error: %v", err))
return
}
+ if parallels < 1 {
+ m.errors = multierror.Append(m.errors, fmt.Errorf("parallels
rading count must bigger than 1"))
+ return
+ }
m.closers = append(m.closers, rd)
+ for i := 0; i < parallels; i++ {
+ m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
+ }
+}
+
+func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
go func() {
for {
record, err := rd.Read()
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index dc0b7cb..813979d 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -43,6 +43,7 @@ type mapReceiver struct {
perCPUBuffer int
dataSupplier func() interface{}
router func(data interface{}) string
+ parallels int
}
func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator
func(partitionNum int) PartitionContext) *EventQueue {
@@ -53,13 +54,14 @@ func NewEventQueue(partitionCount, sizePerPartition int,
contextGenerator func(p
return &EventQueue{count: partitionCount, partitions: partitions}
}
-func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize int,
dataSupplier func() interface{},
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize,
parallels int, dataSupplier func() interface{},
routeGenerator func(data interface{}) string) {
e.receivers = append(e.receivers, &mapReceiver{
emap: emap,
perCPUBuffer: perCPUBufferSize,
dataSupplier: dataSupplier,
router: routeGenerator,
+ parallels: parallels,
})
}
@@ -92,7 +94,7 @@ func (e *EventQueue) start0(ctx context.Context, linker
*Linker) {
func(receiver *mapReceiver) {
linker.ReadEventAsyncWithBufferSize(receiver.emap,
func(data interface{}) {
e.routerTransformer(data, receiver.router)
- }, receiver.perCPUBuffer, receiver.dataSupplier)
+ }, receiver.perCPUBuffer, r.parallels,
receiver.dataSupplier)
}(r)
}