This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 57649f6 Separate multiple process for reading connection information
in the access log module (#151)
57649f6 is described below
commit 57649f6ff4eb95afc54716f5d310ce24a0a2c004
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 5 12:04:57 2024 +0900
Separate multiple process for reading connection information in the access
log module (#151)
---
CHANGES.md | 1 +
configs/rover_configs.yaml | 7 ++
pkg/accesslog/collector/connect.go | 138 ++++++++++++++++++++++++-------------
pkg/accesslog/common/config.go | 17 +++--
4 files changed, 110 insertions(+), 53 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d6a15a1..0b3585e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,6 +5,7 @@ Release Notes.
0.8.0
------------------
#### Features
+* Separate multiple process for reading connection information in the access
log module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml
index 2c5d243..54811fa 100644
--- a/configs/rover_configs.yaml
+++ b/configs/rover_configs.yaml
@@ -140,6 +140,13 @@ access_log:
max_count: ${ROVER_ACCESS_LOG_FLUSH_MAX_COUNT:10000}
# The period of flush access log to the backend
period: ${ROVER_ACCESS_LOG_FLUSH_PERIOD:5s}
+ connection_analyze:
+ # The size of connection buffer on each CPU
+ per_cpu_buffer: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PER_CPU_BUFFER:200KB}
+ # The count of parallel connection analyzer
+ parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
+ # The size of per paralleled analyzer queue
+ queue_size: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_QUEUE_SIZE:2000}
protocol_analyze:
# The size of socket data buffer on each CPU
per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB}
diff --git a/pkg/accesslog/collector/connect.go
b/pkg/accesslog/collector/connect.go
index d6ce8d4..623ee3d 100644
--- a/pkg/accesslog/collector/connect.go
+++ b/pkg/accesslog/collector/connect.go
@@ -18,8 +18,13 @@
package collector
import (
+ "context"
"encoding/binary"
+ "fmt"
"net"
+ "os"
+
+ "github.com/docker/go-units"
"github.com/sirupsen/logrus"
@@ -29,6 +34,7 @@ import (
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/tools"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/ip"
@@ -43,72 +49,108 @@ var connectLogger = logger.GetLogger("access_log",
"collector", "connect")
var connectCollectInstance = NewConnectCollector()
type ConnectCollector struct {
- connTracker *ip.ConnTrack
+ eventQueue *btf.EventQueue
}
func NewConnectCollector() *ConnectCollector {
+ return &ConnectCollector{}
+}
+
+func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext) error {
+ perCPUBufferSize, err :=
units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
+ if err != nil {
+ return err
+ }
+ if int(perCPUBufferSize) < os.Getpagesize() {
+ return fmt.Errorf("the cpu buffer must bigger than %dB",
os.Getpagesize())
+ }
+ if ctx.Config.ConnectionAnalyze.Parallels < 1 {
+ return fmt.Errorf("the parallels cannot be small than 1")
+ }
+ if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
+ return fmt.Errorf("the queue size be small than 1")
+ }
track, err := ip.NewConnTrack()
if err != nil {
connectLogger.Warnf("cannot create the connection tracker, %v",
err)
}
- return &ConnectCollector{connTracker: track}
-}
+ c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels,
ctx.Config.ConnectionAnalyze.QueueSize, func() btf.PartitionContext {
+ return newConnectionPartitionContext(ctx, track)
+ })
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize), func() interface{} {
+ return &events.SocketConnectEvent{}
+ }, func(data interface{}) string {
+ return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
+ })
+ c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
-func (c *ConnectCollector) Start(_ *module.Manager, context
*common.AccessLogContext) error {
- context.BPF.AddTracePoint("syscalls", "sys_enter_connect",
context.BPF.TracepointEnterConnect)
- context.BPF.AddTracePoint("syscalls", "sys_exit_connect",
context.BPF.TracepointExitConnect)
- context.BPF.AddTracePoint("syscalls", "sys_enter_accept",
context.BPF.TracepointEnterAccept)
- context.BPF.AddTracePoint("syscalls", "sys_exit_accept",
context.BPF.TracepointExitAccept)
- context.BPF.AddTracePoint("syscalls", "sys_enter_accept4",
context.BPF.TracepointEnterAccept)
- context.BPF.AddTracePoint("syscalls", "sys_exit_accept4",
context.BPF.TracepointExitAccept)
+ ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect",
ctx.BPF.TracepointEnterConnect)
+ ctx.BPF.AddTracePoint("syscalls", "sys_exit_connect",
ctx.BPF.TracepointExitConnect)
+ ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept",
ctx.BPF.TracepointEnterAccept)
+ ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept",
ctx.BPF.TracepointExitAccept)
+ ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4",
ctx.BPF.TracepointEnterAccept)
+ ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4",
ctx.BPF.TracepointExitAccept)
- context.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
- "tcp_connect": context.BPF.TcpConnect,
+ ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
+ "tcp_connect": ctx.BPF.TcpConnect,
})
- context.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
- "sock_alloc": context.BPF.SockAllocRet,
+ ctx.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
+ "sock_alloc": ctx.BPF.SockAllocRet,
})
- context.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
- "ip4_datagram_connect": context.BPF.Ip4UdpDatagramConnect,
+ ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
+ "ip4_datagram_connect": ctx.BPF.Ip4UdpDatagramConnect,
})
- _ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
- "__nf_conntrack_hash_insert": context.BPF.NfConntrackHashInsert,
+ _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
+ "__nf_conntrack_hash_insert": ctx.BPF.NfConntrackHashInsert,
})
- _ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
- "nf_confirm": context.BPF.NfConfirm,
+ _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
+ "nf_confirm": ctx.BPF.NfConfirm,
})
- _ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
- "ctnetlink_fill_info": context.BPF.NfCtnetlinkFillInfo,
- })
-
- context.BPF.ReadEventAsync(context.BPF.SocketConnectionEventQueue,
func(data interface{}) {
- event := data.(*events.SocketConnectEvent)
- connectLogger.Debugf("receive connect event, connection ID: %d,
randomID: %d, "+
- "pid: %d, fd: %d, role: %s: func: %s, family: %d,
success: %d, conntrack exist: %t",
- event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
- event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
- socketPair := c.buildSocketFromConnectEvent(event)
- if socketPair == nil {
- connectLogger.Debugf("cannot found the socket paire
from connect event, connection ID: %d, randomID: %d",
- event.ConID, event.RandomID)
- return
- }
- connectLogger.Debugf("build socket pair success, connection ID:
%d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
- event.ConID, event.RandomID, socketPair.Role,
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
- context.ConnectionMgr.OnConnectEvent(event, socketPair)
- forwarder.SendConnectEvent(context, event, socketPair)
- }, func() interface{} {
- return &events.SocketConnectEvent{}
+ _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
+ "ctnetlink_fill_info": ctx.BPF.NfCtnetlinkFillInfo,
})
-
return nil
}
func (c *ConnectCollector) Stop() {
}
-func (c *ConnectCollector) fixSocketFamilyIfNeed(event
*events.SocketConnectEvent, result *ip.SocketPair) {
+type ConnectionPartitionContext struct {
+ context *common.AccessLogContext
+ connTracker *ip.ConnTrack
+}
+
+func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker
*ip.ConnTrack) *ConnectionPartitionContext {
+ return &ConnectionPartitionContext{
+ context: ctx,
+ connTracker: connTracker,
+ }
+}
+
+func (c *ConnectionPartitionContext) Start(ctx context.Context) {
+
+}
+
+func (c *ConnectionPartitionContext) Consume(data interface{}) {
+ event := data.(*events.SocketConnectEvent)
+ connectLogger.Debugf("receive connect event, connection ID: %d,
randomID: %d, "+
+ "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d,
conntrack exist: %t",
+ event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
+ event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
+ socketPair := c.buildSocketFromConnectEvent(event)
+ if socketPair == nil {
+ connectLogger.Debugf("cannot found the socket paire from
connect event, connection ID: %d, randomID: %d",
+ event.ConID, event.RandomID)
+ return
+ }
+ connectLogger.Debugf("build socket pair success, connection ID: %d,
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
+ event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP,
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
+ c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
+ forwarder.SendConnectEvent(c.context, event, socketPair)
+}
+
+func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event
*events.SocketConnectEvent, result *ip.SocketPair) {
if result == nil {
return
}
@@ -128,7 +170,7 @@ func (c *ConnectCollector) fixSocketFamilyIfNeed(event
*events.SocketConnectEven
}
}
-func (c *ConnectCollector) buildSocketFromConnectEvent(event
*events.SocketConnectEvent) *ip.SocketPair {
+func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event
*events.SocketConnectEvent) *ip.SocketPair {
if event.SocketFamily != unix.AF_INET && event.SocketFamily !=
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
// if not ipv4, ipv6 or unknown, ignore
return nil
@@ -160,7 +202,7 @@ func (c *ConnectCollector)
buildSocketFromConnectEvent(event *events.SocketConne
return pair
}
-func (c *ConnectCollector) isOnlyLocalPortEmpty(socketPair *ip.SocketPair)
bool {
+func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair
*ip.SocketPair) bool {
if socketPair == nil {
return false
}
@@ -172,7 +214,7 @@ func (c *ConnectCollector) isOnlyLocalPortEmpty(socketPair
*ip.SocketPair) bool
return socketPair.IsValid()
}
-func (c *ConnectCollector) buildSocketPair(event *events.SocketConnectEvent)
*ip.SocketPair {
+func (c *ConnectionPartitionContext) buildSocketPair(event
*events.SocketConnectEvent) *ip.SocketPair {
var result *ip.SocketPair
haveConnTrack := false
if event.SocketFamily == unix.AF_INET {
@@ -232,7 +274,7 @@ func (c *ConnectCollector) buildSocketPair(event
*events.SocketConnectEvent) *ip
return result
}
-func (c *ConnectCollector) tryToUpdateSocketFromConntrack(event
*events.SocketConnectEvent, socket *ip.SocketPair) {
+func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event
*events.SocketConnectEvent, socket *ip.SocketPair) {
if socket != nil && socket.IsValid() && c.connTracker != nil &&
!tools.IsLocalHostAddress(socket.DestIP) &&
event.FuncName != enums.SocketFunctionNameAccept { // accept
event don't need to update the remote address
// if no contract and socket data is valid, then trying to get
the remote address from the socket
diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go
index d934bef..f1aab94 100644
--- a/pkg/accesslog/common/config.go
+++ b/pkg/accesslog/common/config.go
@@ -22,11 +22,12 @@ import "github.com/apache/skywalking-rover/pkg/module"
type Config struct {
module.Config
- Active bool `mapstructure:"active"`
- ExcludeNamespaces string
`mapstructure:"exclude_namespaces"`
- ExcludeClusters string `mapstructure:"exclude_cluster"`
- Flush FlushConfig `mapstructure:"flush"`
- ProtocolAnalyze ProtocolAnalyzeConfig
`mapstructure:"protocol_analyze"`
+ Active bool `mapstructure:"active"`
+ ExcludeNamespaces string
`mapstructure:"exclude_namespaces"`
+ ExcludeClusters string
`mapstructure:"exclude_cluster"`
+ Flush FlushConfig `mapstructure:"flush"`
+ ConnectionAnalyze ConnectionAnalyzeConfig
`mapstructure:"connection_analyze"`
+ ProtocolAnalyze ProtocolAnalyzeConfig
`mapstructure:"protocol_analyze"`
}
type FlushConfig struct {
@@ -34,6 +35,12 @@ type FlushConfig struct {
Period string `mapstructure:"period"`
}
+type ConnectionAnalyzeConfig struct {
+ PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
+ Parallels int `mapstructure:"parallels"`
+ QueueSize int `mapstructure:"queue_size"`
+}
+
type ProtocolAnalyzeConfig struct {
PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
Parallels int `mapstructure:"parallels"`