This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 2b14127 Combine connect and close connection collector in access log
module (#162)
2b14127 is described below
commit 2b141272429c2d37441e61523f0657af3a7dd5b9
Author: mrproliu <[email protected]>
AuthorDate: Fri Dec 6 01:20:32 2024 +0900
Combine connect and close connection collector in access log module (#162)
---
pkg/accesslog/collector/close.go | 56 -----------------
pkg/accesslog/collector/collector.go | 3 +-
.../collector/{connect.go => connection.go} | 70 +++++++++++++---------
3 files changed, 43 insertions(+), 86 deletions(-)
diff --git a/pkg/accesslog/collector/close.go b/pkg/accesslog/collector/close.go
deleted file mode 100644
index 27bfbec..0000000
--- a/pkg/accesslog/collector/close.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package collector
-
-import (
- "github.com/apache/skywalking-rover/pkg/accesslog/common"
- "github.com/apache/skywalking-rover/pkg/accesslog/events"
- "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
- "github.com/apache/skywalking-rover/pkg/logger"
- "github.com/apache/skywalking-rover/pkg/module"
-)
-
-var closeLog = logger.GetLogger("access_log", "collector", "close")
-
-var closeCollectorInstance = NewCloseCollector()
-
-type CloseCollector struct {
-}
-
-func NewCloseCollector() *CloseCollector {
- return &CloseCollector{}
-}
-
-func (c *CloseCollector) Start(_ *module.Manager, context
*common.AccessLogContext) error {
- context.BPF.AddTracePoint("syscalls", "sys_enter_close",
context.BPF.TracepointEnterClose)
- context.BPF.AddTracePoint("syscalls", "sys_exit_close",
context.BPF.TracepointExitClose)
-
- context.BPF.ReadEventAsync(context.BPF.SocketCloseEventQueue, func(data
interface{}) {
- event := data.(*events.SocketCloseEvent)
- closeLog.Debugf("receive close event, connection ID: %d,
randomID: %d, pid: %d, fd: %d",
- event.ConnectionID, event.RandomID, event.PID,
event.SocketFD)
- wapperedEvent := context.ConnectionMgr.OnConnectionClose(event)
- forwarder.SendCloseEvent(context, wapperedEvent)
- }, func() interface{} {
- return &events.SocketCloseEvent{}
- })
- return nil
-}
-
-func (c *CloseCollector) Stop() {
-}
diff --git a/pkg/accesslog/collector/collector.go
b/pkg/accesslog/collector/collector.go
index 8e2f4ae..0a618e0 100644
--- a/pkg/accesslog/collector/collector.go
+++ b/pkg/accesslog/collector/collector.go
@@ -34,8 +34,7 @@ func Collectors() []Collector {
return []Collector{
l24CollectorsInstance,
transferCollectInstance,
- closeCollectorInstance,
- connectCollectInstance,
+ connectionCollectInstance,
tlsCollectInstance,
processCollectInstance,
zTunnelCollectInstance,
diff --git a/pkg/accesslog/collector/connect.go
b/pkg/accesslog/collector/connection.go
similarity index 74%
rename from pkg/accesslog/collector/connect.go
rename to pkg/accesslog/collector/connection.go
index cda29ba..c125397 100644
--- a/pkg/accesslog/collector/connect.go
+++ b/pkg/accesslog/collector/connection.go
@@ -44,15 +44,15 @@ import (
"golang.org/x/sys/unix"
)
-var connectLogger = logger.GetLogger("access_log", "collector", "connect")
+var connectionLogger = logger.GetLogger("access_log", "collector",
"connection")
-var connectCollectInstance = NewConnectCollector()
+var connectionCollectInstance = NewConnectionCollector()
type ConnectCollector struct {
eventQueue *btf.EventQueue
}
-func NewConnectCollector() *ConnectCollector {
+func NewConnectionCollector() *ConnectCollector {
return &ConnectCollector{}
}
@@ -72,7 +72,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
}
track, err := ip.NewConnTrack()
if err != nil {
- connectLogger.Warnf("cannot create the connection tracker, %v",
err)
+ connectionLogger.Warnf("cannot create the connection tracker,
%v", err)
}
c.eventQueue =
btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
@@ -82,6 +82,11 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
}, func(data interface{}) string {
return fmt.Sprintf("%d",
data.(*events.SocketConnectEvent).ConID)
})
+ c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue,
int(perCPUBufferSize), func() interface{} {
+ return &events.SocketCloseEvent{}
+ }, func(data interface{}) string {
+ return fmt.Sprintf("%d",
data.(*events.SocketCloseEvent).ConnectionID)
+ })
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect",
ctx.BPF.TracepointEnterConnect)
@@ -90,6 +95,8 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx
*common.AccessLogContext
ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept",
ctx.BPF.TracepointExitAccept)
ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4",
ctx.BPF.TracepointEnterAccept)
ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4",
ctx.BPF.TracepointExitAccept)
+ ctx.BPF.AddTracePoint("syscalls", "sys_enter_close",
ctx.BPF.TracepointEnterClose)
+ ctx.BPF.AddTracePoint("syscalls", "sys_exit_close",
ctx.BPF.TracepointExitClose)
ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
"tcp_connect": ctx.BPF.TcpConnect,
@@ -133,21 +140,28 @@ func (c *ConnectionPartitionContext) Start(ctx
context.Context) {
}
func (c *ConnectionPartitionContext) Consume(data interface{}) {
- event := data.(*events.SocketConnectEvent)
- connectLogger.Debugf("receive connect event, connection ID: %d,
randomID: %d, "+
- "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d,
conntrack exist: %t",
- event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
- event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
- socketPair := c.buildSocketFromConnectEvent(event)
- if socketPair == nil {
- connectLogger.Debugf("cannot found the socket paire from
connect event, connection ID: %d, randomID: %d",
- event.ConID, event.RandomID)
- return
+ switch event := data.(type) {
+ case *events.SocketConnectEvent:
+ connectionLogger.Debugf("receive connect event, connection ID:
%d, randomID: %d, "+
+ "pid: %d, fd: %d, role: %s: func: %s, family: %d,
success: %d, conntrack exist: %t",
+ event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
+ event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
+ socketPair := c.buildSocketFromConnectEvent(event)
+ if socketPair == nil {
+ connectionLogger.Debugf("cannot found the socket paire
from connect event, connection ID: %d, randomID: %d",
+ event.ConID, event.RandomID)
+ return
+ }
+ connectionLogger.Debugf("build socket pair success, connection
ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
+ event.ConID, event.RandomID, socketPair.Role,
socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
+ c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
+ forwarder.SendConnectEvent(c.context, event, socketPair)
+ case *events.SocketCloseEvent:
+ connectionLogger.Debugf("receive close event, connection ID:
%d, randomID: %d, pid: %d, fd: %d",
+ event.ConnectionID, event.RandomID, event.PID,
event.SocketFD)
+ wapperedEvent :=
c.context.ConnectionMgr.OnConnectionClose(event)
+ forwarder.SendCloseEvent(c.context, wapperedEvent)
}
- connectLogger.Debugf("build socket pair success, connection ID: %d,
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
- event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP,
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
- c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
- forwarder.SendConnectEvent(c.context, event, socketPair)
}
func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event
*events.SocketConnectEvent, result *ip.SocketPair) {
@@ -163,7 +177,7 @@ func (c *ConnectionPartitionContext)
fixSocketFamilyIfNeed(event *events.SocketC
}
if result.Family != actual {
- connectLogger.Debugf("fix the socket family from %d to
%d, connection ID: %d, randomID: %d",
+ connectionLogger.Debugf("fix the socket family from %d
to %d, connection ID: %d, randomID: %d",
result.Family, actual, event.ConID,
event.RandomID)
result.Family = actual
}
@@ -177,24 +191,24 @@ func (c *ConnectionPartitionContext)
buildSocketFromConnectEvent(event *events.S
}
socketPair := c.buildSocketPair(event)
if socketPair != nil && socketPair.IsValid() {
- connectLogger.Debugf("found the connection from the connect
event is valid, connection ID: %d, randomID: %d",
+ connectionLogger.Debugf("found the connection from the connect
event is valid, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
return socketPair
}
// if only the local port not success, maybe the upstream port is not
open, so it could be continued
if c.isOnlyLocalPortEmpty(socketPair) {
event.ConnectSuccess = 0
- connectLogger.Debugf("the connection from the connect event is
only the local port is empty, connection ID: %d, randomID: %d",
+ connectionLogger.Debugf("the connection from the connect event
is only the local port is empty, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
return socketPair
}
pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
- connectLogger.Debugf("cannot found the socket, pid: %d, socket
FD: %d", event.PID, event.SocketFD)
+ connectionLogger.Debugf("cannot found the socket, pid: %d,
socket FD: %d", event.PID, event.SocketFD)
return nil
}
- connectLogger.Debugf("found the connection from the socket, connection
ID: %d, randomID: %d",
+ connectionLogger.Debugf("found the connection from the socket,
connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
pair.Role = enums.ConnectionRole(event.Role)
c.fixSocketFamilyIfNeed(event, pair)
@@ -229,8 +243,8 @@ func (c *ConnectionPartitionContext) buildSocketPair(event
*events.SocketConnect
result.DestIP =
ip.ParseIPV4(uint32(event.ConnTrackUpstreamIPl))
result.DestPort = uint16(event.ConnTrackUpstreamPort)
- if connectLogger.Enable(logrus.DebugLevel) {
- connectLogger.Debugf("found the connection from
the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack:
%s:%d",
+ if connectionLogger.Enable(logrus.DebugLevel) {
+ connectionLogger.Debugf("found the connection
from the conntrack, connection ID: %d, randomID: %d, original: %s:%d,
conntrack: %s:%d",
event.ConID, event.RandomID,
ip.ParseIPV4(event.RemoteAddrV4), uint16(event.RemoteAddrPort), result.DestIP,
result.DestPort)
}
} else {
@@ -255,8 +269,8 @@ func (c *ConnectionPartitionContext) buildSocketPair(event
*events.SocketConnect
result.DestIP =
ip.ParseIPV4(uint32(event.ConnTrackUpstreamIPl))
}
result.DestPort = uint16(event.ConnTrackUpstreamPort)
- if connectLogger.Enable(logrus.DebugLevel) {
- connectLogger.Debugf("found the connection from
the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack:
%s:%d",
+ if connectionLogger.Enable(logrus.DebugLevel) {
+ connectionLogger.Debugf("found the connection
from the conntrack, connection ID: %d, randomID: %d, original: %s:%d,
conntrack: %s:%d",
event.ConID, event.RandomID,
ip.ParseIPV6(event.RemoteAddrV6), uint16(event.RemoteAddrPort), result.DestIP,
result.DestPort)
}
} else {
@@ -282,7 +296,7 @@ func (c *ConnectionPartitionContext)
tryToUpdateSocketFromConntrack(event *event
originalIP := socket.DestIP
originalPort := socket.DestPort
if c.connTracker.UpdateRealPeerAddress(socket) {
- connectLogger.Debugf("update the socket address from
conntrack success, "+
+ connectionLogger.Debugf("update the socket address from
conntrack success, "+
"connection ID: %d, randomID: %d, original
remote: %s:%d, new remote: %s:%d",
event.ConID, event.RandomID, originalIP,
originalPort, socket.DestIP, socket.DestPort)
}