This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b14127  Combine connect and close connection collector in access log 
module (#162)
2b14127 is described below

commit 2b141272429c2d37441e61523f0657af3a7dd5b9
Author: mrproliu <[email protected]>
AuthorDate: Fri Dec 6 01:20:32 2024 +0900

    Combine connect and close connection collector in access log module (#162)
---
 pkg/accesslog/collector/close.go                   | 56 -----------------
 pkg/accesslog/collector/collector.go               |  3 +-
 .../collector/{connect.go => connection.go}        | 70 +++++++++++++---------
 3 files changed, 43 insertions(+), 86 deletions(-)

diff --git a/pkg/accesslog/collector/close.go b/pkg/accesslog/collector/close.go
deleted file mode 100644
index 27bfbec..0000000
--- a/pkg/accesslog/collector/close.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package collector
-
-import (
-       "github.com/apache/skywalking-rover/pkg/accesslog/common"
-       "github.com/apache/skywalking-rover/pkg/accesslog/events"
-       "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
-       "github.com/apache/skywalking-rover/pkg/logger"
-       "github.com/apache/skywalking-rover/pkg/module"
-)
-
-var closeLog = logger.GetLogger("access_log", "collector", "close")
-
-var closeCollectorInstance = NewCloseCollector()
-
-type CloseCollector struct {
-}
-
-func NewCloseCollector() *CloseCollector {
-       return &CloseCollector{}
-}
-
-func (c *CloseCollector) Start(_ *module.Manager, context 
*common.AccessLogContext) error {
-       context.BPF.AddTracePoint("syscalls", "sys_enter_close", 
context.BPF.TracepointEnterClose)
-       context.BPF.AddTracePoint("syscalls", "sys_exit_close", 
context.BPF.TracepointExitClose)
-
-       context.BPF.ReadEventAsync(context.BPF.SocketCloseEventQueue, func(data 
interface{}) {
-               event := data.(*events.SocketCloseEvent)
-               closeLog.Debugf("receive close event, connection ID: %d, 
randomID: %d, pid: %d, fd: %d",
-                       event.ConnectionID, event.RandomID, event.PID, 
event.SocketFD)
-               wapperedEvent := context.ConnectionMgr.OnConnectionClose(event)
-               forwarder.SendCloseEvent(context, wapperedEvent)
-       }, func() interface{} {
-               return &events.SocketCloseEvent{}
-       })
-       return nil
-}
-
-func (c *CloseCollector) Stop() {
-}
diff --git a/pkg/accesslog/collector/collector.go 
b/pkg/accesslog/collector/collector.go
index 8e2f4ae..0a618e0 100644
--- a/pkg/accesslog/collector/collector.go
+++ b/pkg/accesslog/collector/collector.go
@@ -34,8 +34,7 @@ func Collectors() []Collector {
        return []Collector{
                l24CollectorsInstance,
                transferCollectInstance,
-               closeCollectorInstance,
-               connectCollectInstance,
+               connectionCollectInstance,
                tlsCollectInstance,
                processCollectInstance,
                zTunnelCollectInstance,
diff --git a/pkg/accesslog/collector/connect.go 
b/pkg/accesslog/collector/connection.go
similarity index 74%
rename from pkg/accesslog/collector/connect.go
rename to pkg/accesslog/collector/connection.go
index cda29ba..c125397 100644
--- a/pkg/accesslog/collector/connect.go
+++ b/pkg/accesslog/collector/connection.go
@@ -44,15 +44,15 @@ import (
        "golang.org/x/sys/unix"
 )
 
-var connectLogger = logger.GetLogger("access_log", "collector", "connect")
+var connectionLogger = logger.GetLogger("access_log", "collector", 
"connection")
 
-var connectCollectInstance = NewConnectCollector()
+var connectionCollectInstance = NewConnectionCollector()
 
 type ConnectCollector struct {
        eventQueue *btf.EventQueue
 }
 
-func NewConnectCollector() *ConnectCollector {
+func NewConnectionCollector() *ConnectCollector {
        return &ConnectCollector{}
 }
 
@@ -72,7 +72,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        }
        track, err := ip.NewConnTrack()
        if err != nil {
-               connectLogger.Warnf("cannot create the connection tracker, %v", 
err)
+               connectionLogger.Warnf("cannot create the connection tracker, 
%v", err)
        }
        c.eventQueue = 
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, 
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
                return newConnectionPartitionContext(ctx, track)
@@ -82,6 +82,11 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        }, func(data interface{}) string {
                return fmt.Sprintf("%d", 
data.(*events.SocketConnectEvent).ConID)
        })
+       c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, 
int(perCPUBufferSize), func() interface{} {
+               return &events.SocketCloseEvent{}
+       }, func(data interface{}) string {
+               return fmt.Sprintf("%d", 
data.(*events.SocketCloseEvent).ConnectionID)
+       })
        c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
 
        ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", 
ctx.BPF.TracepointEnterConnect)
@@ -90,6 +95,8 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept", 
ctx.BPF.TracepointExitAccept)
        ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4", 
ctx.BPF.TracepointEnterAccept)
        ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4", 
ctx.BPF.TracepointExitAccept)
+       ctx.BPF.AddTracePoint("syscalls", "sys_enter_close", 
ctx.BPF.TracepointEnterClose)
+       ctx.BPF.AddTracePoint("syscalls", "sys_exit_close", 
ctx.BPF.TracepointExitClose)
 
        ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
                "tcp_connect": ctx.BPF.TcpConnect,
@@ -133,21 +140,28 @@ func (c *ConnectionPartitionContext) Start(ctx 
context.Context) {
 }
 
 func (c *ConnectionPartitionContext) Consume(data interface{}) {
-       event := data.(*events.SocketConnectEvent)
-       connectLogger.Debugf("receive connect event, connection ID: %d, 
randomID: %d, "+
-               "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, 
conntrack exist: %t",
-               event.ConID, event.RandomID, event.PID, event.SocketFD, 
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
-               event.SocketFamily, event.ConnectSuccess, 
event.ConnTrackUpstreamPort != 0)
-       socketPair := c.buildSocketFromConnectEvent(event)
-       if socketPair == nil {
-               connectLogger.Debugf("cannot found the socket paire from 
connect event, connection ID: %d, randomID: %d",
-                       event.ConID, event.RandomID)
-               return
+       switch event := data.(type) {
+       case *events.SocketConnectEvent:
+               connectionLogger.Debugf("receive connect event, connection ID: 
%d, randomID: %d, "+
+                       "pid: %d, fd: %d, role: %s: func: %s, family: %d, 
success: %d, conntrack exist: %t",
+                       event.ConID, event.RandomID, event.PID, event.SocketFD, 
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
+                       event.SocketFamily, event.ConnectSuccess, 
event.ConnTrackUpstreamPort != 0)
+               socketPair := c.buildSocketFromConnectEvent(event)
+               if socketPair == nil {
+                       connectionLogger.Debugf("cannot found the socket paire 
from connect event, connection ID: %d, randomID: %d",
+                               event.ConID, event.RandomID)
+                       return
+               }
+               connectionLogger.Debugf("build socket pair success, connection 
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
+                       event.ConID, event.RandomID, socketPair.Role, 
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
+               c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
+               forwarder.SendConnectEvent(c.context, event, socketPair)
+       case *events.SocketCloseEvent:
+               connectionLogger.Debugf("receive close event, connection ID: 
%d, randomID: %d, pid: %d, fd: %d",
+                       event.ConnectionID, event.RandomID, event.PID, 
event.SocketFD)
+               wapperedEvent := 
c.context.ConnectionMgr.OnConnectionClose(event)
+               forwarder.SendCloseEvent(c.context, wapperedEvent)
        }
-       connectLogger.Debugf("build socket pair success, connection ID: %d, 
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
-               event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, 
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
-       c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
-       forwarder.SendConnectEvent(c.context, event, socketPair)
 }
 
 func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event 
*events.SocketConnectEvent, result *ip.SocketPair) {
@@ -163,7 +177,7 @@ func (c *ConnectionPartitionContext) 
fixSocketFamilyIfNeed(event *events.SocketC
                }
 
                if result.Family != actual {
-                       connectLogger.Debugf("fix the socket family from %d to 
%d, connection ID: %d, randomID: %d",
+                       connectionLogger.Debugf("fix the socket family from %d 
to %d, connection ID: %d, randomID: %d",
                                result.Family, actual, event.ConID, 
event.RandomID)
                        result.Family = actual
                }
@@ -177,24 +191,24 @@ func (c *ConnectionPartitionContext) 
buildSocketFromConnectEvent(event *events.S
        }
        socketPair := c.buildSocketPair(event)
        if socketPair != nil && socketPair.IsValid() {
-               connectLogger.Debugf("found the connection from the connect 
event is valid, connection ID: %d, randomID: %d",
+               connectionLogger.Debugf("found the connection from the connect 
event is valid, connection ID: %d, randomID: %d",
                        event.ConID, event.RandomID)
                return socketPair
        }
        // if only the local port not success, maybe the upstream port is not 
open, so it could be continued
        if c.isOnlyLocalPortEmpty(socketPair) {
                event.ConnectSuccess = 0
-               connectLogger.Debugf("the connection from the connect event is 
only the local port is empty, connection ID: %d, randomID: %d",
+               connectionLogger.Debugf("the connection from the connect event 
is only the local port is empty, connection ID: %d, randomID: %d",
                        event.ConID, event.RandomID)
                return socketPair
        }
 
        pair, err := ip.ParseSocket(event.PID, event.SocketFD)
        if err != nil {
-               connectLogger.Debugf("cannot found the socket, pid: %d, socket 
FD: %d", event.PID, event.SocketFD)
+               connectionLogger.Debugf("cannot found the socket, pid: %d, 
socket FD: %d", event.PID, event.SocketFD)
                return nil
        }
-       connectLogger.Debugf("found the connection from the socket, connection 
ID: %d, randomID: %d",
+       connectionLogger.Debugf("found the connection from the socket, 
connection ID: %d, randomID: %d",
                event.ConID, event.RandomID)
        pair.Role = enums.ConnectionRole(event.Role)
        c.fixSocketFamilyIfNeed(event, pair)
@@ -229,8 +243,8 @@ func (c *ConnectionPartitionContext) buildSocketPair(event 
*events.SocketConnect
                        result.DestIP = 
ip.ParseIPV4(uint32(event.ConnTrackUpstreamIPl))
                        result.DestPort = uint16(event.ConnTrackUpstreamPort)
 
-                       if connectLogger.Enable(logrus.DebugLevel) {
-                               connectLogger.Debugf("found the connection from 
the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: 
%s:%d",
+                       if connectionLogger.Enable(logrus.DebugLevel) {
+                               connectionLogger.Debugf("found the connection 
from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, 
conntrack: %s:%d",
                                        event.ConID, event.RandomID, 
ip.ParseIPV4(event.RemoteAddrV4), uint16(event.RemoteAddrPort), result.DestIP, 
result.DestPort)
                        }
                } else {
@@ -255,8 +269,8 @@ func (c *ConnectionPartitionContext) buildSocketPair(event 
*events.SocketConnect
                                result.DestIP = 
ip.ParseIPV4(uint32(event.ConnTrackUpstreamIPl))
                        }
                        result.DestPort = uint16(event.ConnTrackUpstreamPort)
-                       if connectLogger.Enable(logrus.DebugLevel) {
-                               connectLogger.Debugf("found the connection from 
the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: 
%s:%d",
+                       if connectionLogger.Enable(logrus.DebugLevel) {
+                               connectionLogger.Debugf("found the connection 
from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, 
conntrack: %s:%d",
                                        event.ConID, event.RandomID, 
ip.ParseIPV6(event.RemoteAddrV6), uint16(event.RemoteAddrPort), result.DestIP, 
result.DestPort)
                        }
                } else {
@@ -282,7 +296,7 @@ func (c *ConnectionPartitionContext) 
tryToUpdateSocketFromConntrack(event *event
                originalIP := socket.DestIP
                originalPort := socket.DestPort
                if c.connTracker.UpdateRealPeerAddress(socket) {
-                       connectLogger.Debugf("update the socket address from 
conntrack success, "+
+                       connectionLogger.Debugf("update the socket address from 
conntrack success, "+
                                "connection ID: %d, randomID: %d, original 
remote: %s:%d, new remote: %s:%d",
                                event.ConID, event.RandomID, originalIP, 
originalPort, socket.DestIP, socket.DestPort)
                }

Reply via email to