This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 57649f6  Separate multiple process for reading connection information 
in the access log module (#151)
57649f6 is described below

commit 57649f6ff4eb95afc54716f5d310ce24a0a2c004
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 5 12:04:57 2024 +0900

    Separate multiple process for reading connection information in the access 
log module (#151)
---
 CHANGES.md                         |   1 +
 configs/rover_configs.yaml         |   7 ++
 pkg/accesslog/collector/connect.go | 138 ++++++++++++++++++++++++-------------
 pkg/accesslog/common/config.go     |  17 +++--
 4 files changed, 110 insertions(+), 53 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d6a15a1..0b3585e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,6 +5,7 @@ Release Notes.
 0.8.0
 ------------------
 #### Features
+* Separate multiple process for reading connection information in the access 
log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml
index 2c5d243..54811fa 100644
--- a/configs/rover_configs.yaml
+++ b/configs/rover_configs.yaml
@@ -140,6 +140,13 @@ access_log:
     max_count: ${ROVER_ACCESS_LOG_FLUSH_MAX_COUNT:10000}
     # The period of flush access log to the backend
     period: ${ROVER_ACCESS_LOG_FLUSH_PERIOD:5s}
+  connection_analyze:
+    # The size of connection buffer on each CPU
+    per_cpu_buffer: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PER_CPU_BUFFER:200KB}
+    # The count of parallel connection analyzer
+    parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
+    # The size of per paralleled analyzer queue
+    queue_size: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_QUEUE_SIZE:2000}
   protocol_analyze:
     # The size of socket data buffer on each CPU
     per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB}
diff --git a/pkg/accesslog/collector/connect.go 
b/pkg/accesslog/collector/connect.go
index d6ce8d4..623ee3d 100644
--- a/pkg/accesslog/collector/connect.go
+++ b/pkg/accesslog/collector/connect.go
@@ -18,8 +18,13 @@
 package collector
 
 import (
+       "context"
        "encoding/binary"
+       "fmt"
        "net"
+       "os"
+
+       "github.com/docker/go-units"
 
        "github.com/sirupsen/logrus"
 
@@ -29,6 +34,7 @@ import (
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/module"
        "github.com/apache/skywalking-rover/pkg/tools"
+       "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
        "github.com/apache/skywalking-rover/pkg/tools/ip"
 
@@ -43,72 +49,108 @@ var connectLogger = logger.GetLogger("access_log", 
"collector", "connect")
 var connectCollectInstance = NewConnectCollector()
 
 type ConnectCollector struct {
-       connTracker *ip.ConnTrack
+       eventQueue *btf.EventQueue
 }
 
 func NewConnectCollector() *ConnectCollector {
+       return &ConnectCollector{}
+}
+
+func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext) error {
+       perCPUBufferSize, err := 
units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
+       if err != nil {
+               return err
+       }
+       if int(perCPUBufferSize) < os.Getpagesize() {
+               return fmt.Errorf("the cpu buffer must bigger than %dB", 
os.Getpagesize())
+       }
+       if ctx.Config.ConnectionAnalyze.Parallels < 1 {
+               return fmt.Errorf("the parallels cannot be small than 1")
+       }
+       if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
+               return fmt.Errorf("the queue size be small than 1")
+       }
        track, err := ip.NewConnTrack()
        if err != nil {
                connectLogger.Warnf("cannot create the connection tracker, %v", 
err)
        }
-       return &ConnectCollector{connTracker: track}
-}
+       c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, 
ctx.Config.ConnectionAnalyze.QueueSize, func() btf.PartitionContext {
+               return newConnectionPartitionContext(ctx, track)
+       })
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize), func() interface{} {
+               return &events.SocketConnectEvent{}
+       }, func(data interface{}) string {
+               return fmt.Sprintf("%d", 
data.(*events.SocketConnectEvent).ConID)
+       })
+       c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
 
-func (c *ConnectCollector) Start(_ *module.Manager, context 
*common.AccessLogContext) error {
-       context.BPF.AddTracePoint("syscalls", "sys_enter_connect", 
context.BPF.TracepointEnterConnect)
-       context.BPF.AddTracePoint("syscalls", "sys_exit_connect", 
context.BPF.TracepointExitConnect)
-       context.BPF.AddTracePoint("syscalls", "sys_enter_accept", 
context.BPF.TracepointEnterAccept)
-       context.BPF.AddTracePoint("syscalls", "sys_exit_accept", 
context.BPF.TracepointExitAccept)
-       context.BPF.AddTracePoint("syscalls", "sys_enter_accept4", 
context.BPF.TracepointEnterAccept)
-       context.BPF.AddTracePoint("syscalls", "sys_exit_accept4", 
context.BPF.TracepointExitAccept)
+       ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", 
ctx.BPF.TracepointEnterConnect)
+       ctx.BPF.AddTracePoint("syscalls", "sys_exit_connect", 
ctx.BPF.TracepointExitConnect)
+       ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept", 
ctx.BPF.TracepointEnterAccept)
+       ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept", 
ctx.BPF.TracepointExitAccept)
+       ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4", 
ctx.BPF.TracepointEnterAccept)
+       ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4", 
ctx.BPF.TracepointExitAccept)
 
-       context.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
-               "tcp_connect": context.BPF.TcpConnect,
+       ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
+               "tcp_connect": ctx.BPF.TcpConnect,
        })
-       context.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
-               "sock_alloc": context.BPF.SockAllocRet,
+       ctx.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
+               "sock_alloc": ctx.BPF.SockAllocRet,
        })
-       context.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
-               "ip4_datagram_connect": context.BPF.Ip4UdpDatagramConnect,
+       ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
+               "ip4_datagram_connect": ctx.BPF.Ip4UdpDatagramConnect,
        })
 
-       _ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
-               "__nf_conntrack_hash_insert": context.BPF.NfConntrackHashInsert,
+       _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
+               "__nf_conntrack_hash_insert": ctx.BPF.NfConntrackHashInsert,
        })
-       _ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
-               "nf_confirm": context.BPF.NfConfirm,
+       _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
+               "nf_confirm": ctx.BPF.NfConfirm,
        })
-       _ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
-               "ctnetlink_fill_info": context.BPF.NfCtnetlinkFillInfo,
-       })
-
-       context.BPF.ReadEventAsync(context.BPF.SocketConnectionEventQueue, 
func(data interface{}) {
-               event := data.(*events.SocketConnectEvent)
-               connectLogger.Debugf("receive connect event, connection ID: %d, 
randomID: %d, "+
-                       "pid: %d, fd: %d, role: %s: func: %s, family: %d, 
success: %d, conntrack exist: %t",
-                       event.ConID, event.RandomID, event.PID, event.SocketFD, 
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
-                       event.SocketFamily, event.ConnectSuccess, 
event.ConnTrackUpstreamPort != 0)
-               socketPair := c.buildSocketFromConnectEvent(event)
-               if socketPair == nil {
-                       connectLogger.Debugf("cannot found the socket paire 
from connect event, connection ID: %d, randomID: %d",
-                               event.ConID, event.RandomID)
-                       return
-               }
-               connectLogger.Debugf("build socket pair success, connection ID: 
%d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
-                       event.ConID, event.RandomID, socketPair.Role, 
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
-               context.ConnectionMgr.OnConnectEvent(event, socketPair)
-               forwarder.SendConnectEvent(context, event, socketPair)
-       }, func() interface{} {
-               return &events.SocketConnectEvent{}
+       _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
+               "ctnetlink_fill_info": ctx.BPF.NfCtnetlinkFillInfo,
        })
-
        return nil
 }
 
 func (c *ConnectCollector) Stop() {
 }
 
-func (c *ConnectCollector) fixSocketFamilyIfNeed(event 
*events.SocketConnectEvent, result *ip.SocketPair) {
+type ConnectionPartitionContext struct {
+       context     *common.AccessLogContext
+       connTracker *ip.ConnTrack
+}
+
+func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker 
*ip.ConnTrack) *ConnectionPartitionContext {
+       return &ConnectionPartitionContext{
+               context:     ctx,
+               connTracker: connTracker,
+       }
+}
+
+func (c *ConnectionPartitionContext) Start(ctx context.Context) {
+
+}
+
+func (c *ConnectionPartitionContext) Consume(data interface{}) {
+       event := data.(*events.SocketConnectEvent)
+       connectLogger.Debugf("receive connect event, connection ID: %d, 
randomID: %d, "+
+               "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, 
conntrack exist: %t",
+               event.ConID, event.RandomID, event.PID, event.SocketFD, 
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
+               event.SocketFamily, event.ConnectSuccess, 
event.ConnTrackUpstreamPort != 0)
+       socketPair := c.buildSocketFromConnectEvent(event)
+       if socketPair == nil {
+               connectLogger.Debugf("cannot found the socket paire from 
connect event, connection ID: %d, randomID: %d",
+                       event.ConID, event.RandomID)
+               return
+       }
+       connectLogger.Debugf("build socket pair success, connection ID: %d, 
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
+               event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, 
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
+       c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
+       forwarder.SendConnectEvent(c.context, event, socketPair)
+}
+
+func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event 
*events.SocketConnectEvent, result *ip.SocketPair) {
        if result == nil {
                return
        }
@@ -128,7 +170,7 @@ func (c *ConnectCollector) fixSocketFamilyIfNeed(event 
*events.SocketConnectEven
        }
 }
 
-func (c *ConnectCollector) buildSocketFromConnectEvent(event 
*events.SocketConnectEvent) *ip.SocketPair {
+func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event 
*events.SocketConnectEvent) *ip.SocketPair {
        if event.SocketFamily != unix.AF_INET && event.SocketFamily != 
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
                // if not ipv4, ipv6 or unknown, ignore
                return nil
@@ -160,7 +202,7 @@ func (c *ConnectCollector) 
buildSocketFromConnectEvent(event *events.SocketConne
        return pair
 }
 
-func (c *ConnectCollector) isOnlyLocalPortEmpty(socketPair *ip.SocketPair) 
bool {
+func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair 
*ip.SocketPair) bool {
        if socketPair == nil {
                return false
        }
@@ -172,7 +214,7 @@ func (c *ConnectCollector) isOnlyLocalPortEmpty(socketPair 
*ip.SocketPair) bool
        return socketPair.IsValid()
 }
 
-func (c *ConnectCollector) buildSocketPair(event *events.SocketConnectEvent) 
*ip.SocketPair {
+func (c *ConnectionPartitionContext) buildSocketPair(event 
*events.SocketConnectEvent) *ip.SocketPair {
        var result *ip.SocketPair
        haveConnTrack := false
        if event.SocketFamily == unix.AF_INET {
@@ -232,7 +274,7 @@ func (c *ConnectCollector) buildSocketPair(event 
*events.SocketConnectEvent) *ip
        return result
 }
 
-func (c *ConnectCollector) tryToUpdateSocketFromConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) {
+func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) {
        if socket != nil && socket.IsValid() && c.connTracker != nil && 
!tools.IsLocalHostAddress(socket.DestIP) &&
                event.FuncName != enums.SocketFunctionNameAccept { // accept 
event don't need to update the remote address
                // if no contract and socket data is valid, then trying to get 
the remote address from the socket
diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go
index d934bef..f1aab94 100644
--- a/pkg/accesslog/common/config.go
+++ b/pkg/accesslog/common/config.go
@@ -22,11 +22,12 @@ import "github.com/apache/skywalking-rover/pkg/module"
 type Config struct {
        module.Config
 
-       Active            bool                  `mapstructure:"active"`
-       ExcludeNamespaces string                
`mapstructure:"exclude_namespaces"`
-       ExcludeClusters   string                `mapstructure:"exclude_cluster"`
-       Flush             FlushConfig           `mapstructure:"flush"`
-       ProtocolAnalyze   ProtocolAnalyzeConfig 
`mapstructure:"protocol_analyze"`
+       Active            bool                    `mapstructure:"active"`
+       ExcludeNamespaces string                  
`mapstructure:"exclude_namespaces"`
+       ExcludeClusters   string                  
`mapstructure:"exclude_cluster"`
+       Flush             FlushConfig             `mapstructure:"flush"`
+       ConnectionAnalyze ConnectionAnalyzeConfig 
`mapstructure:"connection_analyze"`
+       ProtocolAnalyze   ProtocolAnalyzeConfig   
`mapstructure:"protocol_analyze"`
 }
 
 type FlushConfig struct {
@@ -34,6 +35,12 @@ type FlushConfig struct {
        Period            string `mapstructure:"period"`
 }
 
+type ConnectionAnalyzeConfig struct {
+       PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
+       Parallels        int    `mapstructure:"parallels"`
+       QueueSize        int    `mapstructure:"queue_size"`
+}
+
 type ProtocolAnalyzeConfig struct {
        PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
        Parallels        int    `mapstructure:"parallels"`

Reply via email to