This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 1af3edf  update conntrack
1af3edf is described below

commit 1af3edf3c5ef6ccb97ee407eb32f334600e6aad0
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 17:13:20 2024 +0800

    update conntrack
---
 bpf/accesslog/common/connection.h     | 40 ++++++++++++++++++++++++++++-------
 pkg/accesslog/collector/connection.go | 14 ++++--------
 pkg/accesslog/common/connection.go    | 20 ++++++++++++++++++
 pkg/tools/ip/conntrack.go             | 10 +++++++--
 pkg/tools/ip/tcpresolver.go           |  2 ++
 5 files changed, 66 insertions(+), 20 deletions(-)

diff --git a/bpf/accesslog/common/connection.h 
b/bpf/accesslog/common/connection.h
index 0a75214..9dce0cf 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -24,13 +24,18 @@
 #include "queue.h"
 #include "socket_data.h"
 
+// The rewriting this file in the following points:
+// 1. Adding an identifier to indicate whether it has been filtered in active 
connection (remote port=53).
+// 2. No longer sending a close message when the connection is filtered.
+
 // syscall:connect
 struct connect_args_t {
+    __u64 start_nacs;
+    __u64 randomid;
     __u32 fd;
     __u32 has_remote;
     struct sockaddr* addr;
-    struct sock *sock;
-    __u64 start_nacs;
+    struct sock* sock;
 
     struct connect_track_remote remote;
 };
@@ -93,7 +98,8 @@ struct socket_connect_event_t {
     __u8 socket_family;
     // is connect success or not
     __u8 success;
-    __u32 __pad0;
+    // difference with upstream, if filtered, then not upload other events
+    __u32 filtered;
 
     // upstream
     __u32 remote_addr_v4;
@@ -131,7 +137,8 @@ struct active_connection_t {
     __u8 ssl;
     // skip data upload when the protocol break(such as HTTP2)
     __u8 skip_data_upload;
-    __u8 pad0;
+    // difference with upstream, if filtered, then not upload other events
+    __u8 connection_filtered;
     __u32 pad1;
 };
 struct {
@@ -166,8 +173,8 @@ static __inline bool family_should_trace(const __u32 
family) {
     return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? 
false : true;
 }
 
-static __always_inline void submit_new_connection(void* ctx, bool success, 
__u32 func_name, __u32 tgid, __u32 fd, __u64 start_nacs,
-                                            struct sockaddr* addr, const 
struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
+static __always_inline void submit_new_connection(void* ctx, bool success, 
__u32 func_name, __u32 tgid, __u32 fd, __u64 start_nacs, __u64 randomid,
+                                            struct sockaddr* addr, const 
struct socket* socket, struct connect_track_remote* conntrack, __u8 role, __u32 
has_remote) {
     // send to the user-space the connection event
     __u64 curr_nacs = bpf_ktime_get_ns();
     struct socket_connect_event_t *event;
@@ -178,6 +185,9 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
 
     __u64 conid = gen_tgid_fd(tgid, fd);
     __u64 random_id = bpf_get_prandom_u32();
+    if (randomid != 0) {
+        random_id = randomid;
+    }
     event->conid = conid;
     event->random_id = random_id;
     event->start_time = start_nacs;
@@ -200,7 +210,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         event->conntrack_upstream_iph = (__u64)conntrack->iph;
         event->conntrack_upstream_ipl = (__u64)conntrack->ipl;
         event->conntrack_upstream_port = conntrack->port;
-        bpf_printk("update conntrack, conid: %lld, port: %d", conid, 
conntrack->port);
+        bpf_printk("update conntrack, conid: %lld, ipl: %lld, port: %d", 
conid, conntrack->ipl, conntrack->port);
     }
     event->success = success;
 
@@ -265,6 +275,15 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         socket_family = AF_UNKNOWN;
     }
 
+    // judgement the connection should be filtered
+    __u8 filtered = 0;
+    if (event->remote_port == 53) {
+        event->filtered = 1;
+        filtered = 1;
+    } else {
+        event->filtered = 0;
+    }
+
     rover_submit_buf(ctx, &socket_connection_event_queue, event, 
sizeof(*event));
     if (success == false) {
         return;
@@ -278,6 +297,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     con.sockfd = fd;
     con.role = role;
     con.socket_family = socket_family;
+    con.connection_filtered = filtered;
     bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
 }
 
@@ -306,7 +326,6 @@ static __inline void submit_connection_when_not_exists(void 
*ctx, __u64 id, stru
 
 static __inline void notify_close_connection(void* ctx, __u64 conid, struct 
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
     bpf_map_delete_elem(&socket_data_last_id_map, &conid);
-    bpf_map_delete_elem(&socket_data_id_generate_map, &conid);
     struct socket_close_event_t *close_event;
     close_event = rover_reserve_buf(&socket_close_event_queue, 
sizeof(*close_event));
     if (close_event == NULL) {
@@ -331,6 +350,11 @@ static __inline void submit_close_connection(void* ctx, 
__u32 tgid, __u32 fd, __
     if (con == NULL) {
         return;
     }
+    // if the connection is filtered, then just delete the connection no notify
+    if (con->connection_filtered) {
+        bpf_map_delete_elem(&active_connection_map, &conid);
+        return;
+    }
     notify_close_connection(ctx, conid, con, start_nacs, curr_nacs, ret);
     bpf_map_delete_elem(&active_connection_map, &conid);
 }
\ No newline at end of file
diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index 5c4270b..7954e2d 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -80,13 +80,9 @@ func (c *ConnectCollector) Start(m *module.Manager, ctx 
*common.AccessLogContext
        if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
                return fmt.Errorf("the queue size be small than 1")
        }
-       track, err := ip.NewConnTrack()
-       if err != nil {
-               connectionLogger.Warnf("cannot create the connection tracker, 
%v", err)
-       }
        c.eventQueue = btf.NewEventQueue("connection resolver", 
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
                ctx.Config.ConnectionAnalyze.QueueSize, func(num int) 
btf.PartitionContext {
-                       return NewConnectionPartitionContext(ctx, track, 
m.FindModule(process.ModuleName).(process.K8sOperator))
+                       return NewConnectionPartitionContext(ctx, 
m.FindModule(process.ModuleName).(process.K8sOperator))
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
                ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
@@ -137,17 +133,15 @@ func (c *ConnectCollector) Stop() {
 
 type ConnectionPartitionContext struct {
        context         *common.AccessLogContext
-       connTracker     *ip.ConnTrack
        k8sOperator     process.K8sOperator
        retryableEvents *list.List
        retryableMutex  sync.Mutex
 }
 
-func NewConnectionPartitionContext(ctx *common.AccessLogContext, connTracker 
*ip.ConnTrack,
+func NewConnectionPartitionContext(ctx *common.AccessLogContext,
        k8sOperator process.K8sOperator) *ConnectionPartitionContext {
        return &ConnectionPartitionContext{
                context:         ctx,
-               connTracker:     connTracker,
                k8sOperator:     k8sOperator,
                retryableEvents: list.New(),
        }
@@ -382,12 +376,12 @@ func (c *ConnectionPartitionContext) 
TryToUpdateSocketFromConntrack(event *event
        }
        connectionLogger.Debugf("try to update the remote address from 
conntrack, connection ID: %d, randomID: %d, func: %s, local: %s:%d, remote: 
%s:%d",
                event.ConID, event.RandomID, 
enums.SocketFunctionName(event.FuncName), socket.SrcIP, socket.SrcPort, 
socket.DestIP, socket.DestPort)
-       if c.connTracker != nil {
+       if c.context.ConnectionMgr.ConnectTracker != nil {
                // if no contract and socket data is valid, then trying to get 
the remote address from the socket
                // to encase the remote address is not the real remote address
                originalIP := socket.DestIP
                originalPort := socket.DestPort
-               err := c.connTracker.UpdateRealPeerAddress(socket)
+               err := 
c.context.ConnectionMgr.ConnectTracker.UpdateRealPeerAddress(socket, true)
                if err != nil {
                        return fmt.Errorf("update the socket address from 
conntrack failure, error: %v", err)
                }
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index c6630cc..9b8bd1a 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -128,6 +128,8 @@ type ConnectionManager struct {
        allUnfinishedConnections map[string]*bool
 
        flushListeners []FlusherListener
+
+       ConnectTracker *ip.ConnTrack
 }
 
 func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) {
@@ -158,6 +160,10 @@ type ConnectionInfo struct {
 }
 
 func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader 
*bpf.Loader, filter MonitorFilter) *ConnectionManager {
+       track, err := ip.NewConnTrack()
+       if err != nil {
+               log.Warnf("cannot create the connection tracker, %v", err)
+       }
        mgr := &ConnectionManager{
                moduleMgr:                moduleMgr,
                processOP:                
moduleMgr.FindModule(process.ModuleName).(process.Operator),
@@ -171,6 +177,7 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
                allUnfinishedConnections: make(map[string]*bool),
                monitorFilter:            filter,
                flushListeners:           make([]FlusherListener, 0),
+               ConnectTracker:           track,
        }
        return mgr
 }
@@ -178,6 +185,13 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
 func (c *ConnectionManager) Start(ctx context.Context, accessLogContext 
*AccessLogContext) {
        c.processOP.AddListener(c)
 
+       if c.ConnectTracker != nil {
+               err := c.ConnectTracker.StartMonitoring(ctx)
+               if err != nil {
+                       log.Warnf("cannot start the connection tracker 
monitoring, %v", err)
+               }
+       }
+
        // starting to clean up the un-active connection in BPF
        go func() {
                ticker := time.NewTicker(cleanActiveConnectionInterval)
@@ -254,6 +268,12 @@ func (c *ConnectionManager) Find(event events.Event) 
*ConnectionInfo {
        }
        // is current is connected event, then getting the socket pair
        if e, socket := getSocketPairFromConnectEvent(event); e != nil && 
socket != nil {
+               // if the remote connection is need to use conntrack, then 
update the real peer address
+               if socket.NeedConnTrack {
+                       if err := 
c.ConnectTracker.UpdateRealPeerAddress(socket, false); err != nil {
+                               log.Warnf("cannot update the real peer address, 
%v", err)
+                       }
+               }
                var localAddress, remoteAddress *v3.ConnectionAddress
                localPID, _ := events.ParseConnectionID(event.GetConnectionID())
                localAddress = c.buildLocalAddress(localPID, socket.SrcPort, 
socket)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 32ee090..99a453a 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -157,7 +157,7 @@ func (c *ConnTrack) monitor0(ctx context.Context) (chan 
error, error) {
        return errCh, nil
 }
 
-func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error {
+func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair, fromCacheOnly 
bool) error {
        key := conntrackExpireKey{
                sourceIP:   addr.SrcIP,
                destIP:     addr.DestIP,
@@ -169,10 +169,15 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr 
*SocketPair) error {
                v := val.(conntrackExpireValue)
                addr.DestIP = v.realIP
                addr.DestPort = v.realPort
-               log.Infof("update real peer address from cache: %s:%d", 
addr.DestIP, addr.DestPort)
+               addr.NeedConnTrack = false
+               log.Debugf("update real peer address from cache: %s:%d", 
addr.DestIP, addr.DestPort)
                c.monitorExpire.Delete(key)
                return nil
        }
+       addr.NeedConnTrack = true
+       if fromCacheOnly {
+               return nil
+       }
 
        srcIP, err := netip.ParseAddr(addr.SrcIP)
        if err != nil {
@@ -204,6 +209,7 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) 
error {
 
        addr.DestIP = get.TupleReply.IP.SourceAddress.String()
        addr.DestPort = get.TupleReply.Proto.SourcePort
+       addr.NeedConnTrack = false
        return nil
 }
 
diff --git a/pkg/tools/ip/tcpresolver.go b/pkg/tools/ip/tcpresolver.go
index a07c216..9f09309 100644
--- a/pkg/tools/ip/tcpresolver.go
+++ b/pkg/tools/ip/tcpresolver.go
@@ -34,6 +34,8 @@ type SocketPair struct {
        SrcPort  uint16
        DestIP   string
        DestPort uint16
+
+       NeedConnTrack bool
 }
 
 func (s *SocketPair) IsValid() bool {

Reply via email to