This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new afee353 Fix wrong conntrack data from eBPF (#111)
afee353 is described below
commit afee3533cb51e458cddfd64def7a758ebe537e2a
Author: mrproliu <[email protected]>
AuthorDate: Tue Jan 16 15:45:48 2024 +0800
Fix wrong conntrack data from eBPF (#111)
---
bpf/accesslog/syscalls/connect_conntrack.c | 28 +++++++++++++++++++++++--
pkg/accesslog/collector/connect.go | 2 +-
pkg/accesslog/collector/protocols/queue.go | 33 +++++++++++++++++++++++++++++-
pkg/accesslog/common/connection.go | 5 ++++-
4 files changed, 63 insertions(+), 5 deletions(-)
diff --git a/bpf/accesslog/syscalls/connect_conntrack.c
b/bpf/accesslog/syscalls/connect_conntrack.c
index d7180d5..365c8b7 100644
--- a/bpf/accesslog/syscalls/connect_conntrack.c
+++ b/bpf/accesslog/syscalls/connect_conntrack.c
@@ -23,7 +23,7 @@ static __always_inline void nf_conntrack_read_in6_addr(__u64
*addr_h, __u64 *add
bpf_probe_read(addr_l, sizeof(*addr_l), &in6->s6_addr32[2]);
}
-static __always_inline int
nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple_t *t, const struct
nf_conntrack_tuple *ct) {
+static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(struct
connect_args_t *connect_args, conntrack_tuple_t *t, const struct
nf_conntrack_tuple *ct) {
__builtin_memset(t, 0, sizeof(conntrack_tuple_t));
switch (ct->dst.protonum) {
@@ -60,6 +60,25 @@ static __always_inline int
nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple
return 0;
}
}
+
+ struct sock *sock = connect_args->sock;
+ struct socket *tmps = _(sock->sk_socket);
+ if (tmps != NULL) {
+ struct sock* s;
+ BPF_CORE_READ_INTO(&s, tmps, sk);
+ short unsigned int skc_family;
+ BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
+ if (skc_family == AF_INET) {
+ __u16 local_port;
+ BPF_CORE_READ_INTO(&local_port, s, __sk_common.skc_num);
+ __u32 local_addr_v4;
+ BPF_CORE_READ_INTO(&local_addr_v4, s, __sk_common.skc_rcv_saddr);
+ // make sure connntrack with the same socket address
+ if (local_addr_v4 != t->daddr_l || local_port != t->dport) {
+ return 0;
+ }
+ }
+ }
return 1;
}
@@ -73,6 +92,11 @@ static __always_inline int nf_conn_aware(struct pt_regs*
ctx, struct nf_conn *ct
return 0;
}
+ // already contains the remote address
+ if (&(connect_args->remote) != NULL) {
+ return 0;
+ }
+
__u32 status;
if (bpf_probe_read(&status, sizeof(status), &(ct->status)) != 0) {
return 0; // Invalid ct pointer
@@ -93,7 +117,7 @@ static __always_inline int nf_conn_aware(struct pt_regs*
ctx, struct nf_conn *ct
struct nf_conntrack_tuple reply = tuplehash[IP_CT_DIR_REPLY].tuple;
conntrack_tuple_t reply_conn = {};
- if (!nf_conntrack_tuple_to_conntrack_tuple(&reply_conn, &reply)) {
+ if (!nf_conntrack_tuple_to_conntrack_tuple(connect_args, &reply_conn,
&reply)) {
return 0;
}
diff --git a/pkg/accesslog/collector/connect.go
b/pkg/accesslog/collector/connect.go
index 958103a..f315967 100644
--- a/pkg/accesslog/collector/connect.go
+++ b/pkg/accesslog/collector/connect.go
@@ -116,7 +116,7 @@ func (c *ConnectCollector)
buildSocketFromConnectEvent(event *events.SocketConne
pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
- connectLogger.Warnf("cannot found the socket, pid: %d, socket
FD: %d", event.PID, event.SocketFD)
+ connectLogger.Debugf("cannot found the socket, pid: %d, socket
FD: %d", event.PID, event.SocketFD)
return nil
}
connectLogger.Debugf("found the connection from the socket, connection
ID: %d, randomID: %d",
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 9c38826..2a850ce 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -19,11 +19,14 @@ package protocols
import (
"context"
+ "errors"
"fmt"
"os"
"sync"
"time"
+ "github.com/cilium/ebpf"
+
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
@@ -212,7 +215,14 @@ func (p *PartitionContext) processEvents() {
p.processConnectionEvents(info)
// if the connection already closed and not contains any buffer
data, then delete the connection
- if info.closed && info.dataBuffer.DataLength() == 0 {
+ bufLen := info.dataBuffer.DataLength()
+ if bufLen > 0 {
+ return
+ }
+ if !info.closed {
+ p.checkTheConnectionIsAlreadyClose(info)
+ }
+ if info.closed {
if info.closeCallback != nil {
info.closeCallback()
}
@@ -225,6 +235,26 @@ func (p *PartitionContext) processEvents() {
}
}
+func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con
*PartitionConnection) {
+ if time.Since(con.lastCheckCloseTime) <= time.Second*30 {
+ return
+ }
+ con.lastCheckCloseTime = time.Now()
+ var activateConn common.ActiveConnection
+ if err := p.context.BPF.ActiveConnectionMap.Lookup(con.connectionID,
&activateConn); err != nil {
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ con.closed = true
+ return
+ }
+ log.Warnf("cannot found the active connection: %d-%d, err: %v",
con.connectionID, con.randomID, err)
+ return
+ } else if activateConn.RandomID != 0 && activateConn.RandomID !=
con.randomID {
+ log.Debugf("detect the connection: %d-%d is already closed, so
remove from the activate connection",
+ con.connectionID, con.randomID)
+ con.closed = true
+ }
+}
+
func (p *PartitionContext) processExpireEvents() {
// the expiry must be mutual exclusion with events processor
p.analyzeLocker.Lock()
@@ -267,6 +297,7 @@ type PartitionConnection struct {
closed bool
closeCallback common.ConnectionProcessFinishCallback
skipAllDataAnalyze bool
+ lastCheckCloseTime time.Time
}
func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext,
detail *events.SocketDetailEvent) {
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index bb64ff8..cf5c5ee 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -574,7 +574,10 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished()
{
func (c *ConnectionManager) SkipAllDataAnalyze(conID, ranID uint64) {
var activateConn ActiveConnection
if err := c.activeConnectionMap.Lookup(conID, &activateConn); err !=
nil {
- log.Warnf("cannot found the active connection: %d-%d", conID,
ranID)
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ return
+ }
+ log.Warnf("cannot found the active connection: %d-%d, err: %v",
conID, ranID, err)
return
}
if activateConn.RandomID != ranID {