This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch missing-details in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit 8e2428478a72fb9b3a7012ebcd51a37152ac0d8c Author: mrproliu <[email protected]> AuthorDate: Fri Dec 13 21:40:50 2024 +0800 empty --- pkg/accesslog/collector/connection.go | 11 +++++++---- pkg/accesslog/common/config.go | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/accesslog/collector/connection.go b/pkg/accesslog/collector/connection.go index eeeb7dd..63b55b8 100644 --- a/pkg/accesslog/collector/connection.go +++ b/pkg/accesslog/collector/connection.go @@ -64,7 +64,10 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext if int(perCPUBufferSize) < os.Getpagesize() { return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize()) } - if ctx.Config.ConnectionAnalyze.Parallels < 1 { + if ctx.Config.ConnectionAnalyze.ParseParallels < 1 { + return fmt.Errorf("the parallels cannot be small than 1") + } + if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 { return fmt.Errorf("the parallels cannot be small than 1") } if ctx.Config.ConnectionAnalyze.QueueSize < 1 { @@ -74,15 +77,15 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext if err != nil { connectionLogger.Warnf("cannot create the connection tracker, %v", err) } - c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { + c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { return newConnectionPartitionContext(ctx, track) }) - c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), 15, func() interface{} { + c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { return &events.SocketConnectEvent{} }, func(data interface{}) string { return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID) }) - c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), 15, func() interface{} { + c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { return &events.SocketCloseEvent{} }, func(data interface{}) string { return fmt.Sprintf("%d", data.(*events.SocketCloseEvent).ConnectionID) diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go index 1ef0aeb..0298088 100644 --- a/pkg/accesslog/common/config.go +++ b/pkg/accesslog/common/config.go @@ -37,7 +37,8 @@ type FlushConfig struct { type ConnectionAnalyzeConfig struct { PerCPUBufferSize string `mapstructure:"per_cpu_buffer"` - Parallels int `mapstructure:"parallels"` + ParseParallels int `mapstructure:"parse_parallels"` + AnalyzeParallels int `mapstructure:"analyze_parallels"` QueueSize int `mapstructure:"queue_size"` }
