This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new afee353  Fix wrong conntrack data from eBPF (#111)
afee353 is described below

commit afee3533cb51e458cddfd64def7a758ebe537e2a
Author: mrproliu <[email protected]>
AuthorDate: Tue Jan 16 15:45:48 2024 +0800

    Fix wrong conntrack data from eBPF (#111)
---
 bpf/accesslog/syscalls/connect_conntrack.c | 28 +++++++++++++++++++++++--
 pkg/accesslog/collector/connect.go         |  2 +-
 pkg/accesslog/collector/protocols/queue.go | 33 +++++++++++++++++++++++++++++-
 pkg/accesslog/common/connection.go         |  5 ++++-
 4 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/bpf/accesslog/syscalls/connect_conntrack.c 
b/bpf/accesslog/syscalls/connect_conntrack.c
index d7180d5..365c8b7 100644
--- a/bpf/accesslog/syscalls/connect_conntrack.c
+++ b/bpf/accesslog/syscalls/connect_conntrack.c
@@ -23,7 +23,7 @@ static __always_inline void nf_conntrack_read_in6_addr(__u64 
*addr_h, __u64 *add
     bpf_probe_read(addr_l, sizeof(*addr_l), &in6->s6_addr32[2]);
 }
 
-static __always_inline int 
nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple_t *t, const struct 
nf_conntrack_tuple *ct) {
+static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(struct 
connect_args_t *connect_args, conntrack_tuple_t *t, const struct 
nf_conntrack_tuple *ct) {
     __builtin_memset(t, 0, sizeof(conntrack_tuple_t));
 
     switch (ct->dst.protonum) {
@@ -60,6 +60,25 @@ static __always_inline int 
nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple
             return 0;
         }
     }
+
+    struct sock *sock = connect_args->sock;
+    struct socket *tmps = _(sock->sk_socket);
+    if (tmps != NULL) {
+        struct sock* s;
+        BPF_CORE_READ_INTO(&s, tmps, sk);
+        short unsigned int skc_family;
+        BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
+        if (skc_family == AF_INET) {
+            __u16 local_port;
+            BPF_CORE_READ_INTO(&local_port, s, __sk_common.skc_num);
+            __u32 local_addr_v4;
+            BPF_CORE_READ_INTO(&local_addr_v4, s, __sk_common.skc_rcv_saddr);
+            // make sure connntrack with the same socket address
+            if (local_addr_v4 != t->daddr_l || local_port != t->dport) {
+                return 0;
+            }
+        }
+    }
     return 1;
 }
 
@@ -73,6 +92,11 @@ static __always_inline int nf_conn_aware(struct pt_regs* 
ctx, struct nf_conn *ct
         return 0;
     }
 
+    // already contains the remote address
+    if (&(connect_args->remote) != NULL) {
+        return 0;
+    }
+
     __u32 status;
     if (bpf_probe_read(&status, sizeof(status), &(ct->status)) != 0) {
         return 0; // Invalid ct pointer
@@ -93,7 +117,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* 
ctx, struct nf_conn *ct
     struct nf_conntrack_tuple reply = tuplehash[IP_CT_DIR_REPLY].tuple;
 
     conntrack_tuple_t reply_conn = {};
-    if (!nf_conntrack_tuple_to_conntrack_tuple(&reply_conn, &reply)) {
+    if (!nf_conntrack_tuple_to_conntrack_tuple(connect_args, &reply_conn, 
&reply)) {
         return 0;
     }
 
diff --git a/pkg/accesslog/collector/connect.go 
b/pkg/accesslog/collector/connect.go
index 958103a..f315967 100644
--- a/pkg/accesslog/collector/connect.go
+++ b/pkg/accesslog/collector/connect.go
@@ -116,7 +116,7 @@ func (c *ConnectCollector) 
buildSocketFromConnectEvent(event *events.SocketConne
 
        pair, err := ip.ParseSocket(event.PID, event.SocketFD)
        if err != nil {
-               connectLogger.Warnf("cannot found the socket, pid: %d, socket 
FD: %d", event.PID, event.SocketFD)
+               connectLogger.Debugf("cannot found the socket, pid: %d, socket 
FD: %d", event.PID, event.SocketFD)
                return nil
        }
        connectLogger.Debugf("found the connection from the socket, connection 
ID: %d, randomID: %d",
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 9c38826..2a850ce 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -19,11 +19,14 @@ package protocols
 
 import (
        "context"
+       "errors"
        "fmt"
        "os"
        "sync"
        "time"
 
+       "github.com/cilium/ebpf"
+
        "github.com/apache/skywalking-rover/pkg/accesslog/common"
        "github.com/apache/skywalking-rover/pkg/accesslog/events"
        "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
@@ -212,7 +215,14 @@ func (p *PartitionContext) processEvents() {
                p.processConnectionEvents(info)
 
                // if the connection already closed and not contains any buffer 
data, then delete the connection
-               if info.closed && info.dataBuffer.DataLength() == 0 {
+               bufLen := info.dataBuffer.DataLength()
+               if bufLen > 0 {
+                       return
+               }
+               if !info.closed {
+                       p.checkTheConnectionIsAlreadyClose(info)
+               }
+               if info.closed {
                        if info.closeCallback != nil {
                                info.closeCallback()
                        }
@@ -225,6 +235,26 @@ func (p *PartitionContext) processEvents() {
        }
 }
 
+func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con 
*PartitionConnection) {
+       if time.Since(con.lastCheckCloseTime) <= time.Second*30 {
+               return
+       }
+       con.lastCheckCloseTime = time.Now()
+       var activateConn common.ActiveConnection
+       if err := p.context.BPF.ActiveConnectionMap.Lookup(con.connectionID, 
&activateConn); err != nil {
+               if errors.Is(err, ebpf.ErrKeyNotExist) {
+                       con.closed = true
+                       return
+               }
+               log.Warnf("cannot found the active connection: %d-%d, err: %v", 
con.connectionID, con.randomID, err)
+               return
+       } else if activateConn.RandomID != 0 && activateConn.RandomID != 
con.randomID {
+               log.Debugf("detect the connection: %d-%d is already closed, so 
remove from the activate connection",
+                       con.connectionID, con.randomID)
+               con.closed = true
+       }
+}
+
 func (p *PartitionContext) processExpireEvents() {
        // the expiry must be mutual exclusion with events processor
        p.analyzeLocker.Lock()
@@ -267,6 +297,7 @@ type PartitionConnection struct {
        closed                 bool
        closeCallback          common.ConnectionProcessFinishCallback
        skipAllDataAnalyze     bool
+       lastCheckCloseTime     time.Time
 }
 
 func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, 
detail *events.SocketDetailEvent) {
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index bb64ff8..cf5c5ee 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -574,7 +574,10 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() 
{
 func (c *ConnectionManager) SkipAllDataAnalyze(conID, ranID uint64) {
        var activateConn ActiveConnection
        if err := c.activeConnectionMap.Lookup(conID, &activateConn); err != 
nil {
-               log.Warnf("cannot found the active connection: %d-%d", conID, 
ranID)
+               if errors.Is(err, ebpf.ErrKeyNotExist) {
+                       return
+               }
+               log.Warnf("cannot found the active connection: %d-%d, err: %v", 
conID, ranID, err)
                return
        }
        if activateConn.RandomID != ranID {

Reply via email to