This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch ringbuf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/ringbuf by this push:
new 3b37ed2 Introduce ringbuf queue in access log module
3b37ed2 is described below
commit 3b37ed2a6a3f3d0ddd440f0fac479c4b0643dcbe
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 16 10:50:59 2024 +0800
Introduce ringbuf queue in access log module
---
bpf/accesslog/common/connection.h | 63 ++++++++++++++----------------
pkg/accesslog/collector/protocols/queue.go | 2 +-
2 files changed, 30 insertions(+), 35 deletions(-)
diff --git a/bpf/accesslog/common/connection.h
b/bpf/accesslog/common/connection.h
index 31e9f78..cb799e8 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -21,6 +21,7 @@
#include "socket.h"
#include "data_args.h"
#include "socket_opts.h"
+#include "queue.h"
// syscall:connect
struct connect_args_t {
@@ -106,19 +107,7 @@ struct socket_connect_event_t {
__u64 conntrack_upstream_iph;
__u32 conntrack_upstream_port;
};
-struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_connection_event_queue SEC(".maps");
-struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, __u32);
- __type(value, struct socket_connect_event_t);
- __uint(max_entries, 1);
-} socket_connect_event_per_cpu_map SEC(".maps");
-static __inline struct socket_connect_event_t* create_socket_connect_event() {
- __u32 kZero = 0;
- return bpf_map_lookup_elem(&socket_connect_event_per_cpu_map, &kZero);
-}
+DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
// active connection cached into the hashmap
// if connection closed, then deleted
@@ -170,9 +159,7 @@ struct socket_close_event_t {
// close success
__u32 success;
};
-struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_close_event_queue SEC(".maps");
+DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ?
false : true;
@@ -182,7 +169,8 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
struct sockaddr* addr, const
struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
// send to the user-space the connection event
__u64 curr_nacs = bpf_ktime_get_ns();
- struct socket_connect_event_t *event = create_socket_connect_event();
+ struct socket_connect_event_t *event;
+ event = rover_reserve_buf(&socket_connection_event_queue, sizeof(*event));
if (event == NULL) {
return;
}
@@ -195,12 +183,11 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
event->end_time = curr_nacs;
event->func_name = func_name;
if (func_name == SOCKET_OPTS_TYPE_CONNECT) {
- event->role = CONNECTION_ROLE_TYPE_CLIENT;
+ role = CONNECTION_ROLE_TYPE_CLIENT;
} else if (func_name == SOCKET_OPTS_TYPE_ACCEPT) {
- event->role = CONNECTION_ROLE_TYPE_SERVER;
- } else {
- event->role = role;
+ role = CONNECTION_ROLE_TYPE_SERVER;
}
+ event->role = role;
event->pid = tgid;
event->sockfd = fd;
@@ -216,6 +203,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
event->success = success;
__u16 port;
+ __u8 socket_family;
event->local_port = 0;
event->remote_port = 0;
if (socket == NULL) {
@@ -236,6 +224,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
short unsigned int skc_family;
BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
event->socket_family = skc_family;
+ socket_family = skc_family;
if (event->socket_family == AF_INET) {
BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
@@ -254,6 +243,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
}
} else if (addr != NULL) {
event->socket_family = _(addr->sa_family);
+ socket_family = event->socket_family;
if (event->socket_family == AF_INET) {
struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
bpf_probe_read(&event->remote_addr_v4,
sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
@@ -270,9 +260,10 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
}
} else {
event->socket_family = AF_UNKNOWN;
+ socket_family = AF_UNKNOWN;
}
- bpf_perf_event_output(ctx, &socket_connection_event_queue,
BPF_F_CURRENT_CPU, event, sizeof(*event));
+ rover_submit_buf(ctx, &socket_connection_event_queue, event,
sizeof(*event));
if (success == false) {
return;
}
@@ -280,11 +271,11 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
// if connect success, then add the activate connection into the kernel
// active connection save
struct active_connection_t con = {};
- con.random_id = event->random_id;
+ con.random_id = random_id;
con.pid = tgid;
con.sockfd = fd;
- con.role = event->role;
- con.socket_family = event->socket_family;
+ con.role = role;
+ con.socket_family = socket_family;
bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
}
@@ -312,17 +303,21 @@ static __inline void
submit_connection_when_not_exists(void *ctx, __u64 id, stru
}
static __inline void notify_close_connection(void* ctx, __u64 conid, struct
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
- struct socket_close_event_t close_event = {};
+ struct socket_close_event_t *close_event;
+ close_event = rover_reserve_buf(&socket_close_event_queue,
sizeof(*close_event));
+ if (close_event == NULL) {
+ return;
+ }
- close_event.conid = conid;
- close_event.random_id = con->random_id;
- close_event.start_time = start_time;
- close_event.end_time = end_time;
- close_event.pid = con->pid;
- close_event.sockfd = con->sockfd;
- close_event.success = ret > 0 ? true : false;
+ close_event->conid = conid;
+ close_event->random_id = con->random_id;
+ close_event->start_time = start_time;
+ close_event->end_time = end_time;
+ close_event->pid = con->pid;
+ close_event->sockfd = con->sockfd;
+ close_event->success = ret > 0 ? true : false;
- bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU,
&close_event, sizeof(close_event));
+ rover_submit_buf(ctx, &socket_close_event_queue, close_event,
sizeof(*close_event));
}
static __inline void submit_close_connection(void* ctx, __u32 tgid, __u32 fd,
__u64 start_nacs, int ret) {
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 7e93dce..8597259 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -92,7 +92,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
- q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetail,
int(q.perCPUBuffer),
+ q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return q.detailSupplier()
}, func(data interface{}) string {