This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 1af3edf update conntrack
1af3edf is described below
commit 1af3edf3c5ef6ccb97ee407eb32f334600e6aad0
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 17:13:20 2024 +0800
update conntrack
---
bpf/accesslog/common/connection.h | 40 ++++++++++++++++++++++++++++-------
pkg/accesslog/collector/connection.go | 14 ++++--------
pkg/accesslog/common/connection.go | 20 ++++++++++++++++++
pkg/tools/ip/conntrack.go | 10 +++++++--
pkg/tools/ip/tcpresolver.go | 2 ++
5 files changed, 66 insertions(+), 20 deletions(-)
diff --git a/bpf/accesslog/common/connection.h
b/bpf/accesslog/common/connection.h
index 0a75214..9dce0cf 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -24,13 +24,18 @@
#include "queue.h"
#include "socket_data.h"
+// The rewriting this file in the following points:
+// 1. Adding an identifier to indicate whether it has been filtered in active
connection (remote port=53).
+// 2. No longer sending a close message when the connection is filtered.
+
// syscall:connect
struct connect_args_t {
+ __u64 start_nacs;
+ __u64 randomid;
__u32 fd;
__u32 has_remote;
struct sockaddr* addr;
- struct sock *sock;
- __u64 start_nacs;
+ struct sock* sock;
struct connect_track_remote remote;
};
@@ -93,7 +98,8 @@ struct socket_connect_event_t {
__u8 socket_family;
// is connect success or not
__u8 success;
- __u32 __pad0;
+ // difference with upstream, if filtered, then not upload other events
+ __u32 filtered;
// upstream
__u32 remote_addr_v4;
@@ -131,7 +137,8 @@ struct active_connection_t {
__u8 ssl;
// skip data upload when the protocol break(such as HTTP2)
__u8 skip_data_upload;
- __u8 pad0;
+ // difference with upstream, if filtered, then not upload other events
+ __u8 connection_filtered;
__u32 pad1;
};
struct {
@@ -166,8 +173,8 @@ static __inline bool family_should_trace(const __u32
family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ?
false : true;
}
-static __always_inline void submit_new_connection(void* ctx, bool success,
__u32 func_name, __u32 tgid, __u32 fd, __u64 start_nacs,
- struct sockaddr* addr, const
struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
+static __always_inline void submit_new_connection(void* ctx, bool success,
__u32 func_name, __u32 tgid, __u32 fd, __u64 start_nacs, __u64 randomid,
+ struct sockaddr* addr, const
struct socket* socket, struct connect_track_remote* conntrack, __u8 role, __u32
has_remote) {
// send to the user-space the connection event
__u64 curr_nacs = bpf_ktime_get_ns();
struct socket_connect_event_t *event;
@@ -178,6 +185,9 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
__u64 conid = gen_tgid_fd(tgid, fd);
__u64 random_id = bpf_get_prandom_u32();
+ if (randomid != 0) {
+ random_id = randomid;
+ }
event->conid = conid;
event->random_id = random_id;
event->start_time = start_nacs;
@@ -200,7 +210,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
event->conntrack_upstream_iph = (__u64)conntrack->iph;
event->conntrack_upstream_ipl = (__u64)conntrack->ipl;
event->conntrack_upstream_port = conntrack->port;
- bpf_printk("update conntrack, conid: %lld, port: %d", conid,
conntrack->port);
+ bpf_printk("update conntrack, conid: %lld, ipl: %lld, port: %d",
conid, conntrack->ipl, conntrack->port);
}
event->success = success;
@@ -265,6 +275,15 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
socket_family = AF_UNKNOWN;
}
+ // judgement the connection should be filtered
+ __u8 filtered = 0;
+ if (event->remote_port == 53) {
+ event->filtered = 1;
+ filtered = 1;
+ } else {
+ event->filtered = 0;
+ }
+
rover_submit_buf(ctx, &socket_connection_event_queue, event,
sizeof(*event));
if (success == false) {
return;
@@ -278,6 +297,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
con.sockfd = fd;
con.role = role;
con.socket_family = socket_family;
+ con.connection_filtered = filtered;
bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
}
@@ -306,7 +326,6 @@ static __inline void submit_connection_when_not_exists(void
*ctx, __u64 id, stru
static __inline void notify_close_connection(void* ctx, __u64 conid, struct
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
bpf_map_delete_elem(&socket_data_last_id_map, &conid);
- bpf_map_delete_elem(&socket_data_id_generate_map, &conid);
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue,
sizeof(*close_event));
if (close_event == NULL) {
@@ -331,6 +350,11 @@ static __inline void submit_close_connection(void* ctx,
__u32 tgid, __u32 fd, __
if (con == NULL) {
return;
}
+ // if the connection is filtered, then just delete the connection no notify
+ if (con->connection_filtered) {
+ bpf_map_delete_elem(&active_connection_map, &conid);
+ return;
+ }
notify_close_connection(ctx, conid, con, start_nacs, curr_nacs, ret);
bpf_map_delete_elem(&active_connection_map, &conid);
}
\ No newline at end of file
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index 5c4270b..7954e2d 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -80,13 +80,9 @@ func (c *ConnectCollector) Start(m *module.Manager, ctx
*common.AccessLogContext
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
return fmt.Errorf("the queue size be small than 1")
}
- track, err := ip.NewConnTrack()
- if err != nil {
- connectionLogger.Warnf("cannot create the connection tracker,
%v", err)
- }
c.eventQueue = btf.NewEventQueue("connection resolver",
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int)
btf.PartitionContext {
- return NewConnectionPartitionContext(ctx, track,
m.FindModule(process.ModuleName).(process.K8sOperator))
+ return NewConnectionPartitionContext(ctx,
m.FindModule(process.ModuleName).(process.K8sOperator))
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue,
int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{}
{
@@ -137,17 +133,15 @@ func (c *ConnectCollector) Stop() {
type ConnectionPartitionContext struct {
context *common.AccessLogContext
- connTracker *ip.ConnTrack
k8sOperator process.K8sOperator
retryableEvents *list.List
retryableMutex sync.Mutex
}
-func NewConnectionPartitionContext(ctx *common.AccessLogContext, connTracker
*ip.ConnTrack,
+func NewConnectionPartitionContext(ctx *common.AccessLogContext,
k8sOperator process.K8sOperator) *ConnectionPartitionContext {
return &ConnectionPartitionContext{
context: ctx,
- connTracker: connTracker,
k8sOperator: k8sOperator,
retryableEvents: list.New(),
}
@@ -382,12 +376,12 @@ func (c *ConnectionPartitionContext)
TryToUpdateSocketFromConntrack(event *event
}
connectionLogger.Debugf("try to update the remote address from
conntrack, connection ID: %d, randomID: %d, func: %s, local: %s:%d, remote:
%s:%d",
event.ConID, event.RandomID,
enums.SocketFunctionName(event.FuncName), socket.SrcIP, socket.SrcPort,
socket.DestIP, socket.DestPort)
- if c.connTracker != nil {
+ if c.context.ConnectionMgr.ConnectTracker != nil {
// if no contract and socket data is valid, then trying to get
the remote address from the socket
// to encase the remote address is not the real remote address
originalIP := socket.DestIP
originalPort := socket.DestPort
- err := c.connTracker.UpdateRealPeerAddress(socket)
+ err :=
c.context.ConnectionMgr.ConnectTracker.UpdateRealPeerAddress(socket, true)
if err != nil {
return fmt.Errorf("update the socket address from
conntrack failure, error: %v", err)
}
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index c6630cc..9b8bd1a 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -128,6 +128,8 @@ type ConnectionManager struct {
allUnfinishedConnections map[string]*bool
flushListeners []FlusherListener
+
+ ConnectTracker *ip.ConnTrack
}
func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -158,6 +160,10 @@ type ConnectionInfo struct {
}
func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
+ track, err := ip.NewConnTrack()
+ if err != nil {
+ log.Warnf("cannot create the connection tracker, %v", err)
+ }
mgr := &ConnectionManager{
moduleMgr: moduleMgr,
processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
@@ -171,6 +177,7 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
allUnfinishedConnections: make(map[string]*bool),
monitorFilter: filter,
flushListeners: make([]FlusherListener, 0),
+ ConnectTracker: track,
}
return mgr
}
@@ -178,6 +185,13 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
func (c *ConnectionManager) Start(ctx context.Context, accessLogContext
*AccessLogContext) {
c.processOP.AddListener(c)
+ if c.ConnectTracker != nil {
+ err := c.ConnectTracker.StartMonitoring(ctx)
+ if err != nil {
+ log.Warnf("cannot start the connection tracker
monitoring, %v", err)
+ }
+ }
+
// starting to clean up the un-active connection in BPF
go func() {
ticker := time.NewTicker(cleanActiveConnectionInterval)
@@ -254,6 +268,12 @@ func (c *ConnectionManager) Find(event events.Event)
*ConnectionInfo {
}
// is current is connected event, then getting the socket pair
if e, socket := getSocketPairFromConnectEvent(event); e != nil &&
socket != nil {
+ // if the remote connection is need to use conntrack, then
update the real peer address
+ if socket.NeedConnTrack {
+ if err :=
c.ConnectTracker.UpdateRealPeerAddress(socket, false); err != nil {
+ log.Warnf("cannot update the real peer address,
%v", err)
+ }
+ }
var localAddress, remoteAddress *v3.ConnectionAddress
localPID, _ := events.ParseConnectionID(event.GetConnectionID())
localAddress = c.buildLocalAddress(localPID, socket.SrcPort,
socket)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 32ee090..99a453a 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -157,7 +157,7 @@ func (c *ConnTrack) monitor0(ctx context.Context) (chan
error, error) {
return errCh, nil
}
-func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error {
+func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair, fromCacheOnly
bool) error {
key := conntrackExpireKey{
sourceIP: addr.SrcIP,
destIP: addr.DestIP,
@@ -169,10 +169,15 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr
*SocketPair) error {
v := val.(conntrackExpireValue)
addr.DestIP = v.realIP
addr.DestPort = v.realPort
- log.Infof("update real peer address from cache: %s:%d",
addr.DestIP, addr.DestPort)
+ addr.NeedConnTrack = false
+ log.Debugf("update real peer address from cache: %s:%d",
addr.DestIP, addr.DestPort)
c.monitorExpire.Delete(key)
return nil
}
+ addr.NeedConnTrack = true
+ if fromCacheOnly {
+ return nil
+ }
srcIP, err := netip.ParseAddr(addr.SrcIP)
if err != nil {
@@ -204,6 +209,7 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair)
error {
addr.DestIP = get.TupleReply.IP.SourceAddress.String()
addr.DestPort = get.TupleReply.Proto.SourcePort
+ addr.NeedConnTrack = false
return nil
}
diff --git a/pkg/tools/ip/tcpresolver.go b/pkg/tools/ip/tcpresolver.go
index a07c216..9f09309 100644
--- a/pkg/tools/ip/tcpresolver.go
+++ b/pkg/tools/ip/tcpresolver.go
@@ -34,6 +34,8 @@ type SocketPair struct {
SrcPort uint16
DestIP string
DestPort uint16
+
+ NeedConnTrack bool
}
func (s *SocketPair) IsValid() bool {