This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 023d6c2  Introduce ringbuf queue in access log module (#170)
023d6c2 is described below

commit 023d6c2f0588016b1ed6e9046fc5d27a4985728d
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 16 15:36:22 2024 +0800

    Introduce ringbuf queue in access log module (#170)
---
 .github/workflows/rover.yaml                       |   2 +-
 CHANGES.md                                         |   1 +
 bpf/accesslog/common/connection.h                  |  63 +++++------
 bpf/accesslog/syscalls/transfer.h                  |  17 +--
 bpf/include/queue.h                                |  61 +++++++++++
 bpf/include/socket_data.h                          |  73 ++++++------
 pkg/accesslog/bpf/loader.go                        |   2 +-
 pkg/accesslog/collector/protocols/queue.go         |   4 +-
 .../continuous/checker/bpf/network/network.go      |   3 +-
 .../task/network/analyze/layer7/events.go          |   2 +-
 .../layer7/protocols/http1/reader/reader.go        |  12 +-
 pkg/profiling/task/network/bpf/bpf.go              |   2 +-
 pkg/profiling/task/offcpu/runner.go                |   2 +-
 pkg/tools/btf/ebpf.go                              |  41 ++++++-
 pkg/tools/btf/linker.go                            |  18 ++-
 pkg/tools/btf/queue.go                             | 122 +++++++++++++++++++++
 pkg/tools/buffer/buffer.go                         |   6 +-
 17 files changed, 324 insertions(+), 107 deletions(-)

diff --git a/.github/workflows/rover.yaml b/.github/workflows/rover.yaml
index a72e3bf..f33567e 100644
--- a/.github/workflows/rover.yaml
+++ b/.github/workflows/rover.yaml
@@ -342,7 +342,7 @@ jobs:
         if: ${{ failure() }}
         name: Upload Logs
         with:
-          name: logs
+          name: logs ${{ matrix.test.name }}
           path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
 
   required:
diff --git a/CHANGES.md b/CHANGES.md
index ec2109d..a27176d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
 * Support parallel parsing protocol data in the access log module.
 * Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
 * Reduce missing details issue in the access log module.
+* Introduce ringbuf queue to improve performance in the access log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/common/connection.h 
b/bpf/accesslog/common/connection.h
index 31e9f78..cb799e8 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -21,6 +21,7 @@
 #include "socket.h"
 #include "data_args.h"
 #include "socket_opts.h"
+#include "queue.h"
 
 // syscall:connect
 struct connect_args_t {
@@ -106,19 +107,7 @@ struct socket_connect_event_t {
     __u64 conntrack_upstream_iph;
     __u32 conntrack_upstream_port;
 };
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_connection_event_queue SEC(".maps");
-struct {
-    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-    __type(key, __u32);
-    __type(value, struct socket_connect_event_t);
-    __uint(max_entries, 1);
-} socket_connect_event_per_cpu_map SEC(".maps");
-static __inline struct socket_connect_event_t* create_socket_connect_event() {
-  __u32 kZero = 0;
-  return bpf_map_lookup_elem(&socket_connect_event_per_cpu_map, &kZero);
-}
+DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
 
 // active connection cached into the hashmap
 // if connection closed, then deleted
@@ -170,9 +159,7 @@ struct socket_close_event_t {
     // close success
     __u32 success;
 };
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_close_event_queue SEC(".maps");
+DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
 
 static __inline bool family_should_trace(const __u32 family) {
     return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? 
false : true;
@@ -182,7 +169,8 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
                                             struct sockaddr* addr, const 
struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
     // send to the user-space the connection event
     __u64 curr_nacs = bpf_ktime_get_ns();
-    struct socket_connect_event_t *event = create_socket_connect_event();
+    struct socket_connect_event_t *event;
+    event = rover_reserve_buf(&socket_connection_event_queue, sizeof(*event));
     if (event == NULL) {
         return;
     }
@@ -195,12 +183,11 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     event->end_time = curr_nacs;
     event->func_name = func_name;
     if (func_name == SOCKET_OPTS_TYPE_CONNECT) {
-        event->role = CONNECTION_ROLE_TYPE_CLIENT;
+        role = CONNECTION_ROLE_TYPE_CLIENT;
     } else if (func_name == SOCKET_OPTS_TYPE_ACCEPT) {
-        event->role = CONNECTION_ROLE_TYPE_SERVER;
-    } else {
-        event->role = role;
+        role = CONNECTION_ROLE_TYPE_SERVER;
     }
+    event->role = role;
     event->pid = tgid;
     event->sockfd = fd;
 
@@ -216,6 +203,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     event->success = success;
 
     __u16 port;
+    __u8 socket_family;
     event->local_port = 0;
     event->remote_port = 0;
     if (socket == NULL) {
@@ -236,6 +224,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         short unsigned int skc_family;
         BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
         event->socket_family = skc_family;
+        socket_family = skc_family;
 
         if (event->socket_family == AF_INET) {
             BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
@@ -254,6 +243,7 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         }
     } else if (addr != NULL) {
         event->socket_family = _(addr->sa_family);
+        socket_family = event->socket_family;
         if (event->socket_family == AF_INET) {
             struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
             bpf_probe_read(&event->remote_addr_v4, 
sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
@@ -270,9 +260,10 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
         }
     } else {
         event->socket_family = AF_UNKNOWN;
+        socket_family = AF_UNKNOWN;
     }
 
-    bpf_perf_event_output(ctx, &socket_connection_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    rover_submit_buf(ctx, &socket_connection_event_queue, event, 
sizeof(*event));
     if (success == false) {
         return;
     }
@@ -280,11 +271,11 @@ static __always_inline void submit_new_connection(void* 
ctx, bool success, __u32
     // if connect success, then add the activate connection into the kernel
     // active connection save
     struct active_connection_t con = {};
-    con.random_id = event->random_id;
+    con.random_id = random_id;
     con.pid = tgid;
     con.sockfd = fd;
-    con.role = event->role;
-    con.socket_family = event->socket_family;
+    con.role = role;
+    con.socket_family = socket_family;
     bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
 }
 
@@ -312,17 +303,21 @@ static __inline void 
submit_connection_when_not_exists(void *ctx, __u64 id, stru
 }
 
 static __inline void notify_close_connection(void* ctx, __u64 conid, struct 
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
-    struct socket_close_event_t close_event = {};
+    struct socket_close_event_t *close_event;
+    close_event = rover_reserve_buf(&socket_close_event_queue, 
sizeof(*close_event));
+    if (close_event == NULL) {
+        return;
+    }
 
-    close_event.conid = conid;
-    close_event.random_id = con->random_id;
-    close_event.start_time = start_time;
-    close_event.end_time = end_time;
-    close_event.pid = con->pid;
-    close_event.sockfd = con->sockfd;
-    close_event.success = ret > 0 ? true : false;
+    close_event->conid = conid;
+    close_event->random_id = con->random_id;
+    close_event->start_time = start_time;
+    close_event->end_time = end_time;
+    close_event->pid = con->pid;
+    close_event->sockfd = con->sockfd;
+    close_event->success = ret > 0 ? true : false;
 
-    bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU, 
&close_event, sizeof(close_event));
+    rover_submit_buf(ctx, &socket_close_event_queue, close_event, 
sizeof(*close_event));
 }
 
 static __inline void submit_close_connection(void* ctx, __u32 tgid, __u32 fd, 
__u64 start_nacs, int ret) {
diff --git a/bpf/accesslog/syscalls/transfer.h 
b/bpf/accesslog/syscalls/transfer.h
index e1f1ba7..053ce01 100644
--- a/bpf/accesslog/syscalls/transfer.h
+++ b/bpf/accesslog/syscalls/transfer.h
@@ -19,6 +19,7 @@
 #include "socket_opts.h"
 #include "socket_data.h"
 #include "socket_reader.h"
+#include "queue.h"
 #include "protocol_analyzer.h"
 #include "../common/connection.h"
 #include "../common/data_args.h"
@@ -68,15 +69,7 @@ struct socket_detail_t {
     __u8 ssl;
 };
 
-struct {
-    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-    __type(key, __u32);
-    __type(value, struct socket_detail_t);
-    __uint(max_entries, 1);
-} socket_detail_event_per_cpu_map SEC(".maps");
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_detail_data_queue SEC(".maps");
+DATA_QUEUE(socket_detail_queue, 1024 * 1024);
 
 static __always_inline void process_write_data(void *ctx, __u64 id, struct 
sock_data_args_t *args, ssize_t bytes_count,
                                         __u32 data_direction, const bool vecs, 
__u8 func_name, bool ssl) {
@@ -144,8 +137,8 @@ static __always_inline void process_write_data(void *ctx, 
__u64 id, struct sock_
     // 1. when the SSL connection sends SSL(unencrypted) message
     // 2. when the not SSL connection sends plain data
     if (conn->ssl == ssl) {
-        __u32 kZero = 0;
-        struct socket_detail_t *detail = 
bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero);
+        struct socket_detail_t *detail;
+        detail = rover_reserve_buf(&socket_detail_queue, sizeof(*detail));
         if (detail != NULL) {
             detail->connection_id = conid;
             detail->random_id = conn->random_id;
@@ -177,7 +170,7 @@ static __always_inline void process_write_data(void *ctx, 
__u64 id, struct sock_
             detail->l2_enter_queue_count = args->l2_enter_queue_count;
             detail->l4_package_rcv_from_queue_time = 
args->total_package_receive_from_queue_time;
 
-            bpf_perf_event_output(ctx, &socket_detail_data_queue, 
BPF_F_CURRENT_CPU, detail, sizeof(*detail));
+            rover_submit_buf(ctx, &socket_detail_queue, detail, 
sizeof(*detail));
         }
     }
 
diff --git a/bpf/include/queue.h b/bpf/include/queue.h
new file mode 100644
index 0000000..c6b532c
--- /dev/null
+++ b/bpf/include/queue.h
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "api.h"
+
+#define DATA_QUEUE(name, size)               \
+       struct {                                    \
+               __uint(type, BPF_MAP_TYPE_RINGBUF); \
+               __uint(max_entries, size);          \
+       } name SEC(".maps");                        \
+       const void *rover_data_queue_##name __attribute__((unused));
+
+struct {
+       __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+       __uint(max_entries, 1);
+       __uint(key_size, sizeof(__u32));
+       __uint(value_size, 10240); // all events are less than 10KB
+} rover_data_heap SEC(".maps");
+
+static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
+       static const int zero = 0;
+
+       if (bpf_core_enum_value_exists(enum bpf_func_id,
+                                      BPF_FUNC_ringbuf_reserve))
+               return bpf_ringbuf_reserve(map, size, 0);
+
+       return bpf_map_lookup_elem(&rover_data_heap, &zero);
+}
+
+static __always_inline void rover_discard_buf(void *buf)
+{
+       if (bpf_core_enum_value_exists(enum bpf_func_id,
+                                      BPF_FUNC_ringbuf_discard))
+               bpf_ringbuf_discard(buf, 0);
+}
+
+static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, 
__u64 size) {
+       if (bpf_core_enum_value_exists(enum bpf_func_id,
+                                      BPF_FUNC_ringbuf_submit)) {
+               bpf_ringbuf_submit(buf, 0);
+               return 0;
+       }
+
+       return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
+}
\ No newline at end of file
diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h
index cae2622..0d62ccb 100644
--- a/bpf/include/socket_data.h
+++ b/bpf/include/socket_data.h
@@ -19,6 +19,7 @@
 
 #include "socket_opts.h"
 #include "protocol_analyzer.h"
+#include "queue.h"
 
 #define SOCKET_UPLOAD_CHUNK_LIMIT 12
 
@@ -43,9 +44,7 @@ struct {
     __type(value, struct socket_data_upload_event);
     __uint(max_entries, 1);
 } socket_data_upload_event_per_cpu_map SEC(".maps");
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_data_upload_event_queue SEC(".maps");
+DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
 
 struct socket_data_sequence_t {
     __u64 data_id;
@@ -106,23 +105,40 @@ static __always_inline struct upload_data_args* 
generate_socket_upload_args() {
     return bpf_map_lookup_elem(&socket_data_upload_args_per_cpu_map, &kZero);
 }
 
-static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, 
struct socket_data_upload_event *event) {
-    if (size <= 0) {
+static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, 
struct upload_data_args *args) {
+    struct socket_data_upload_event *socket_data_event;
+    socket_data_event = rover_reserve_buf(&socket_data_upload_queue, 
sizeof(*socket_data_event));
+    if (socket_data_event == NULL) {
         return;
     }
-    if (size > sizeof(event->buffer)) {
-        size = sizeof(event->buffer);
+
+    if (size > sizeof(socket_data_event->buffer)) {
+        size = sizeof(socket_data_event->buffer);
+    }
+    if (size <= 0) {
+        rover_discard_buf(socket_data_event);
+        return;
     }
-    event->sequence = index;
-    event->data_len = size;
-    event->finished = is_finished;
-    event->have_reduce_after_chunk = have_reduce_after_chunk;
-    bpf_probe_read(&event->buffer, size, buf);
 
-    bpf_perf_event_output(ctx, &socket_data_upload_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    // basic data
+    socket_data_event->start_time = args->start_time;
+    socket_data_event->end_time = args->end_time;
+    socket_data_event->protocol = args->connection_protocol;
+    socket_data_event->direction = args->data_direction;
+    socket_data_event->conid = args->con_id;
+    socket_data_event->randomid = args->random_id;
+    socket_data_event->total_size = args->bytes_count;
+    socket_data_event->data_id = args->socket_data_id;
+
+    socket_data_event->sequence = index;
+    socket_data_event->data_len = size;
+    socket_data_event->finished = is_finished;
+    socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
+    bpf_probe_read(&socket_data_event->buffer, size, buf);
+    rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, 
sizeof(*socket_data_event));
 }
 
-static __always_inline void upload_socket_data_buf(void *ctx, char* buf, 
ssize_t size, struct socket_data_upload_event *event, __u8 force_unfinished) {
+static __always_inline void upload_socket_data_buf(void *ctx, char* buf, 
ssize_t size, struct upload_data_args *args, __u8 force_unfinished) {
     ssize_t already_send = 0;
 #pragma unroll
     for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) {
@@ -141,9 +157,9 @@ static __always_inline void upload_socket_data_buf(void 
*ctx, char* buf, ssize_t
         __u8 sequence = index;
         if (force_unfinished == 1 && need_send_in_chunk > 0) {
             is_finished = 0;
-            sequence = generate_socket_sequence(event->conid, event->data_id);
+            sequence = generate_socket_sequence(args->con_id, 
args->socket_data_id);
         }
-        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, 
need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
+        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, 
need_send_in_chunk, is_finished, have_reduce_after_chunk, args);
         already_send += need_send_in_chunk;
 
     }
@@ -170,12 +186,12 @@ if (iov_index < iovlen) {                                 
                  \
         have_reduce_after_chunk = 1;                                        \
     }                                                                       \
     __u32 is_finished = (need_send_in_chunk + already_send) >= size || 
loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;                   
         \
-    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + 
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, 
event);      \
+    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + 
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, 
args);      \
     already_send += need_send_in_chunk;                                        
                                                      \
     loop_count++;                                                              
                                                      \
 }
 
-static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* 
iov, const size_t iovlen, ssize_t size, struct socket_data_upload_event *event) 
{
+static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* 
iov, const size_t iovlen, ssize_t size, struct upload_data_args *args) {
     ssize_t already_send = 0;
     ssize_t cur_iov_sended = 0;
     __u8 iov_index = 0;
@@ -198,26 +214,9 @@ static __inline void upload_socket_data(void *ctx, struct 
upload_data_args *args
     if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || 
args->connection_ssl != args->socket_data_ssl || 
args->connection_skip_data_upload == 1) {
         return;
     }
-    // generate event
-    __u32 kZero = 0;
-    struct socket_data_upload_event *event = 
bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero);
-    if (event == NULL) {
-        return;
-    }
-
-    // basic data
-    event->start_time = args->start_time;
-    event->end_time = args->end_time;
-    event->protocol = args->connection_protocol;
-    event->direction = args->data_direction;
-    event->conid = args->con_id;
-    event->randomid = args->random_id;
-    event->total_size = args->bytes_count;
-    event->data_id = args->socket_data_id;
-
     if (args->socket_data_buf != NULL) {
-        upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, 
event, args->socket_ssl_buffer_force_unfinished);
+        upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, 
args, args->socket_ssl_buffer_force_unfinished);
     } else if (args->socket_data_iovec != NULL) {
-        upload_socket_data_iov(ctx, args->socket_data_iovec, 
args->socket_data_iovlen, args->bytes_count, event);
+        upload_socket_data_iov(ctx, args->socket_data_iovec, 
args->socket_data_iovlen, args->bytes_count, args);
     }
 }
\ No newline at end of file
diff --git a/pkg/accesslog/bpf/loader.go b/pkg/accesslog/bpf/loader.go
index 5a4f827..5058913 100644
--- a/pkg/accesslog/bpf/loader.go
+++ b/pkg/accesslog/bpf/loader.go
@@ -34,7 +34,7 @@ type Loader struct {
 
 func NewLoader() (*Loader, error) {
        objs := bpfObjects{}
-       if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil {
                return nil, err
        }
 
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 833d323..7e07eb3 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -92,13 +92,13 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                func(num int) btf.PartitionContext {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, 
int(q.perCPUBuffer),
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return q.detailSupplier()
                }, func(data interface{}) string {
                        return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
                })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, 
int(q.perCPUBuffer),
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return &events.SocketDataUploadEvent{}
                }, func(data interface{}) string {
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go 
b/pkg/profiling/continuous/checker/bpf/network/network.go
index ba03f21..a3cb644 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -134,9 +134,10 @@ func startBPFIfNeed() error {
        }
 
        bpf = &bpfObjects{}
-       if err := loadBpfObjects(bpf, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       if err := btf.LoadBPFAndAssign(loadBpf, bpf); err != nil {
                return err
        }
+
        bpfLinker = btf.NewLinker()
        bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_sendmsg": 
bpf.TcpSendmsg})
        bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_recvmsg": 
bpf.TcpRecvmsg})
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go 
b/pkg/profiling/task/network/analyze/layer7/events.go
index dab74d1..707aaa8 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -36,7 +36,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize 
int, config *profili
 
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) 
{
        // socket buffer data
-       
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
+       l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, 
l.protocolPerCPUBuffer, 1, func() interface{} {
                return &analyzeBase.SocketDataUploadEvent{}
        }, func(data interface{}) string {
                return 
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
diff --git 
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go 
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 8cc8c85..151e563 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -104,11 +104,19 @@ func (m *MessageOpt) ContentTotalSize() int {
 }
 
 func (m *MessageOpt) StartTime() uint64 {
-       return m.HeaderBuffer().FirstSocketBuffer().StartTime()
+       socketBuffer := m.HeaderBuffer().FirstSocketBuffer()
+       if socketBuffer == nil {
+               return 0
+       }
+       return socketBuffer.StartTime()
 }
 
 func (m *MessageOpt) EndTime() uint64 {
-       return m.BodyBuffer().LastSocketBuffer().EndTime()
+       socketBuffer := m.BodyBuffer().LastSocketBuffer()
+       if socketBuffer == nil {
+               return m.StartTime()
+       }
+       return socketBuffer.EndTime()
 }
 
 func (m *MessageOpt) Direction() enums.SocketDataDirection {
diff --git a/pkg/profiling/task/network/bpf/bpf.go 
b/pkg/profiling/task/network/bpf/bpf.go
index 083935c..67a875f 100644
--- a/pkg/profiling/task/network/bpf/bpf.go
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -34,7 +34,7 @@ type Loader struct {
 
 func NewLoader() (*Loader, error) {
        objs := bpfObjects{}
-       if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil {
                return nil, err
        }
 
diff --git a/pkg/profiling/task/offcpu/runner.go 
b/pkg/profiling/task/offcpu/runner.go
index 492d996..8ce3ee6 100644
--- a/pkg/profiling/task/offcpu/runner.go
+++ b/pkg/profiling/task/offcpu/runner.go
@@ -113,7 +113,7 @@ func (r *Runner) Run(ctx context.Context, notify 
base.ProfilingRunningSuccessNot
        if !replacedPid {
                return fmt.Errorf("replace the monitor pid failure")
        }
-       if err1 := spec.LoadAndAssign(&objs, 
btf.GetEBPFCollectionOptionsIfNeed()); err1 != nil {
+       if err1 := spec.LoadAndAssign(&objs, 
btf.GetEBPFCollectionOptionsIfNeed(spec)); err1 != nil {
                return err1
        }
        r.bpf = &objs
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 71e20f3..307f29f 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -20,6 +20,7 @@ package btf
 import (
        "bytes"
        "embed"
+       "errors"
        "fmt"
        "path/filepath"
        "sync"
@@ -41,7 +42,16 @@ var (
        log = logger.GetLogger("tools", "btf")
 )
 
-func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions {
+func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs 
interface{}) error {
+       bpf, err := loadBPF()
+       if err != nil {
+               return err
+       }
+
+       return bpf.LoadAndAssign(objs, GetEBPFCollectionOptionsIfNeed(bpf))
+}
+
+func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) 
*ebpf.CollectionOptions {
        findBTFOnce.Do(func() {
                readSpec, err := getKernelBTFAddress()
                if err != nil {
@@ -52,6 +62,7 @@ func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions 
{
                spec = readSpec
        })
 
+       enhanceDataQueueOpts(bpfSpec)
        return &ebpf.CollectionOptions{Programs: 
ebpf.ProgramOptions{KernelTypes: spec}}
 }
 
@@ -87,3 +98,31 @@ func getKernelBTFAddress() (spec *btf.Spec, err error) {
 func asset(file string) ([]byte, error) {
        return assets.ReadFile(filepath.ToSlash(file))
 }
+
+func validateGlobalConstVoidPtrVar(t btf.Type) error {
+       btfVar, ok := t.(*btf.Var)
+       if !ok {
+               return errors.New("not of type btf.Var")
+       }
+
+       if btfVar.Linkage != btf.GlobalVar {
+               return fmt.Errorf("%q is not a global variable", btfVar.Name)
+       }
+
+       btfPtr, ok := btfVar.Type.(*btf.Pointer)
+       if !ok {
+               return fmt.Errorf("%q is not a pointer", btfVar.Name)
+       }
+
+       btfConst, ok := btfPtr.Target.(*btf.Const)
+       if !ok {
+               return fmt.Errorf("%q is not const", btfVar.Name)
+       }
+
+       _, ok = btfConst.Type.(*btf.Void)
+       if !ok {
+               return fmt.Errorf("%q is not a const void pointer", btfVar.Name)
+       }
+
+       return nil
+}
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index f663737..867d84e 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -161,7 +161,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader 
RingBufferReader, data
 
 func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
        parallels int, dataSupplier func() interface{}) {
-       rd, err := perf.NewReader(emap, perCPUBuffer)
+       rd, err := newQueueReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
                return
@@ -177,10 +177,10 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap 
*ebpf.Map, bufReader RingBuff
        }
 }
 
-func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
        go func() {
                for {
-                       record, err := rd.Read()
+                       sample, err := rd.Read()
                        if err != nil {
                                if errors.Is(err, perf.ErrClosed) {
                                        return
@@ -188,23 +188,21 @@ func (m *Linker) asyncReadEvent(rd *perf.Reader, emap 
*ebpf.Map, dataSupplier fu
                                log.Warnf("read from %s ringbuffer error: %v", 
emap.String(), err)
                                continue
                        }
-
-                       if record.LostSamples != 0 {
-                               log.Warnf("perf event queue(%s) full, dropped 
%d samples", emap.String(), record.LostSamples)
+                       if len(sample) == 0 {
                                continue
                        }
 
                        data := dataSupplier()
                        if r, ok := data.(reader.EventReader); ok {
-                               sampleReader := 
reader.NewReader(record.RawSample)
+                               sampleReader := reader.NewReader(sample)
                                r.ReadFrom(sampleReader)
                                if readErr := sampleReader.HasError(); readErr 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
                                        continue
                                }
                        } else {
-                               if err := 
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
                                        continue
                                }
                        }
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 813979d..290f717 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,12 +19,134 @@ package btf
 
 import (
        "context"
+       "fmt"
        "hash/fnv"
+       "os"
+       "strings"
        "sync"
 
        "github.com/cilium/ebpf"
+       "github.com/cilium/ebpf/perf"
+       "github.com/cilium/ebpf/ringbuf"
 )
 
+const dataQueuePrefix = "rover_data_queue_"
+
+var (
+       ringbufChecker   sync.Once
+       ringbufAvailable bool
+)
+
+func isRingbufAvailable() bool {
+       ringbufChecker.Do(func() {
+               buf, err := ebpf.NewMap(&ebpf.MapSpec{
+                       Type:       ebpf.RingBuf,
+                       MaxEntries: uint32(os.Getpagesize()),
+               })
+
+               buf.Close()
+
+               ringbufAvailable = err == nil
+
+               if ringbufAvailable {
+                       log.Infof("detect the ring buffer is available in 
current system for enhancement of data queue")
+               }
+       })
+
+       return ringbufAvailable
+}
+
+func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) {
+       it := bpfSpec.Types.Iterate()
+       for it.Next() {
+               if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) {
+                       continue
+               }
+               if err := validateGlobalConstVoidPtrVar(it.Type); err != nil {
+                       panic(fmt.Errorf("invalid global const void ptr var %s: 
%v", it.Type.TypeName(), err))
+               }
+
+               // if the ringbuf not available, use perf event array
+               if !isRingbufAvailable() {
+                       mapName := strings.TrimPrefix(it.Type.TypeName(), 
dataQueuePrefix)
+                       mapSpec := bpfSpec.Maps[mapName]
+                       mapSpec.Type = ebpf.PerfEventArray
+                       mapSpec.KeySize = 4
+                       mapSpec.ValueSize = 4
+               }
+       }
+}
+
+type queueReader interface {
+       Read() ([]byte, error)
+       Close() error
+}
+
+func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
+       switch emap.Type() {
+       case ebpf.RingBuf:
+               return newRingBufReader(emap)
+       case ebpf.PerfEventArray:
+               return newPerfQueueReader(emap, perCPUBuffer)
+       }
+       return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
+}
+
+type perfQueueReader struct {
+       name   string
+       reader *perf.Reader
+}
+
+func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, 
error) {
+       reader, err := perf.NewReader(emap, perCPUBuffer)
+       if err != nil {
+               return nil, err
+       }
+       return &perfQueueReader{reader: reader, name: emap.String()}, nil
+}
+
+func (p *perfQueueReader) Read() ([]byte, error) {
+       read, err := p.reader.Read()
+       if err != nil {
+               return nil, err
+       }
+
+       if read.LostSamples != 0 {
+               log.Warnf("perf event queue(%s) full, dropped %d samples", 
p.name, read.LostSamples)
+               return nil, nil
+       }
+
+       return read.RawSample, nil
+}
+
+func (p *perfQueueReader) Close() error {
+       return p.reader.Close()
+}
+
+type ringBufReader struct {
+       reader *ringbuf.Reader
+}
+
+func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
+       reader, err := ringbuf.NewReader(emap)
+       if err != nil {
+               return nil, err
+       }
+       return &ringBufReader{reader: reader}, nil
+}
+
+func (r *ringBufReader) Read() ([]byte, error) {
+       read, err := r.reader.Read()
+       if err != nil {
+               return nil, err
+       }
+       return read.RawSample, nil
+}
+
+func (r *ringBufReader) Close() error {
+       return r.reader.Close()
+}
+
 type PartitionContext interface {
        Start(ctx context.Context)
        Consume(data interface{})
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 9993f47..c16a9fd 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -367,14 +367,14 @@ func (r *Buffer) DataSize() int64 {
 }
 
 func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
-       if r.dataEvents == nil || r.dataEvents.Len() == 0 {
+       if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
                return nil
        }
        return r.dataEvents.Front().Value.(SocketDataBuffer)
 }
 
 func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
-       if r.dataEvents == nil || r.dataEvents.Len() == 0 {
+       if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
                return nil
        }
        return r.dataEvents.Back().Value.(SocketDataBuffer)
@@ -382,7 +382,7 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
 
 // DetectNotSendingLastPosition detect the buffer contains not sending data: 
the BPF limited socket data count
 func (r *Buffer) DetectNotSendingLastPosition() *Position {
-       if r.dataEvents.Len() == 0 {
+       if r == nil || r.dataEvents.Len() == 0 {
                return nil
        }
 

Reply via email to