This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch ringbuf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/ringbuf by this push:
     new 3b37ed2  Introduce ringbuf queue in access log module
3b37ed2 is described below

commit 3b37ed2a6a3f3d0ddd440f0fac479c4b0643dcbe
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 16 10:50:59 2024 +0800

    Introduce ringbuf queue in access log module
---
 bpf/accesslog/common/connection.h          | 63 ++++++++++++++----------------
 pkg/accesslog/collector/protocols/queue.go |  2 +-
 2 files changed, 30 insertions(+), 35 deletions(-)

diff --git a/bpf/accesslog/common/connection.h 
b/bpf/accesslog/common/connection.h
index 31e9f78..cb799e8 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -21,6 +21,7 @@
 #include "socket.h"
 #include "data_args.h"
 #include "socket_opts.h"
+#include "queue.h"
 
 // syscall:connect
 struct connect_args_t {
@@ -106,19 +107,7 @@ struct socket_connect_event_t {
     __u64 conntrack_upstream_iph;
     __u32 conntrack_upstream_port;
 };
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_connection_event_queue SEC(".maps");
-struct {
-    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-    __type(key, __u32);
-    __type(value, struct socket_connect_event_t);
-    __uint(max_entries, 1);
-} socket_connect_event_per_cpu_map SEC(".maps");
-static __inline struct socket_connect_event_t* create_socket_connect_event() {
-  __u32 kZero = 0;
-  return bpf_map_lookup_elem(&socket_connect_event_per_cpu_map, &kZero);
-}
+DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
 
 // active connection cached into the hashmap
 // if connection closed, then deleted
@@ -170,9 +159,7 @@ struct socket_close_event_t {
     // close success
     __u32 success;
 };
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_close_event_queue SEC(".maps");
+DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
 
 static __inline bool family_should_trace(const __u32 family) {
     return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? 
false : true;
@@ -182,7 +169,8 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
                                             struct sockaddr* addr, const 
struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
     // send to the user-space the connection event
     __u64 curr_nacs = bpf_ktime_get_ns();
-    struct socket_connect_event_t *event = create_socket_connect_event();
+    struct socket_connect_event_t *event;
+    event = rover_reserve_buf(&socket_connection_event_queue, sizeof(*event));
     if (event == NULL) {
         return;
     }
@@ -195,12 +183,11 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     event->end_time = curr_nacs;
     event->func_name = func_name;
     if (func_name == SOCKET_OPTS_TYPE_CONNECT) {
-        event->role = CONNECTION_ROLE_TYPE_CLIENT;
+        role = CONNECTION_ROLE_TYPE_CLIENT;
     } else if (func_name == SOCKET_OPTS_TYPE_ACCEPT) {
-        event->role = CONNECTION_ROLE_TYPE_SERVER;
-    } else {
-        event->role = role;
+        role = CONNECTION_ROLE_TYPE_SERVER;
     }
+    event->role = role;
     event->pid = tgid;
     event->sockfd = fd;
 
@@ -216,6 +203,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     event->success = success;
 
     __u16 port;
+    __u8 socket_family;
     event->local_port = 0;
     event->remote_port = 0;
     if (socket == NULL) {
@@ -236,6 +224,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         short unsigned int skc_family;
         BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
         event->socket_family = skc_family;
+        socket_family = skc_family;
 
         if (event->socket_family == AF_INET) {
             BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
@@ -254,6 +243,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         }
     } else if (addr != NULL) {
         event->socket_family = _(addr->sa_family);
+        socket_family = event->socket_family;
         if (event->socket_family == AF_INET) {
             struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
             bpf_probe_read(&event->remote_addr_v4, 
sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
@@ -270,9 +260,10 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         }
     } else {
         event->socket_family = AF_UNKNOWN;
+        socket_family = AF_UNKNOWN;
     }
 
-    bpf_perf_event_output(ctx, &socket_connection_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    rover_submit_buf(ctx, &socket_connection_event_queue, event, 
sizeof(*event));
     if (success == false) {
         return;
     }
@@ -280,11 +271,11 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     // if connect success, then add the activate connection into the kernel
     // active connection save
     struct active_connection_t con = {};
-    con.random_id = event->random_id;
+    con.random_id = random_id;
     con.pid = tgid;
     con.sockfd = fd;
-    con.role = event->role;
-    con.socket_family = event->socket_family;
+    con.role = role;
+    con.socket_family = socket_family;
     bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
 }
 
@@ -312,17 +303,21 @@ static __inline void 
submit_connection_when_not_exists(void *ctx, __u64 id, stru
 }
 
 static __inline void notify_close_connection(void* ctx, __u64 conid, struct 
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
-    struct socket_close_event_t close_event = {};
+    struct socket_close_event_t *close_event;
+    close_event = rover_reserve_buf(&socket_close_event_queue, 
sizeof(*close_event));
+    if (close_event == NULL) {
+        return;
+    }
 
-    close_event.conid = conid;
-    close_event.random_id = con->random_id;
-    close_event.start_time = start_time;
-    close_event.end_time = end_time;
-    close_event.pid = con->pid;
-    close_event.sockfd = con->sockfd;
-    close_event.success = ret > 0 ? true : false;
+    close_event->conid = conid;
+    close_event->random_id = con->random_id;
+    close_event->start_time = start_time;
+    close_event->end_time = end_time;
+    close_event->pid = con->pid;
+    close_event->sockfd = con->sockfd;
+    close_event->success = ret > 0 ? true : false;
 
-    bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU, 
&close_event, sizeof(close_event));
+    rover_submit_buf(ctx, &socket_close_event_queue, close_event, 
sizeof(*close_event));
 }
 
 static __inline void submit_close_connection(void* ctx, __u32 tgid, __u32 fd, 
__u64 start_nacs, int ret) {
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 7e93dce..8597259 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -92,7 +92,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                func(num int) btf.PartitionContext {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetail, 
int(q.perCPUBuffer),
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return q.detailSupplier()
                }, func(data interface{}) string {

Reply via email to