This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch ringbuf
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit 688234f4a47c042eb24f28994a4cdbeea39359ab
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 16 10:21:37 2024 +0800

    Introduce ringbuf queue in access log module
---
 CHANGES.md                                         |   1 +
 bpf/accesslog/syscalls/transfer.h                  |  17 +--
 bpf/include/queue.h                                |  61 +++++++++++
 bpf/include/socket_data.h                          |  73 ++++++------
 pkg/accesslog/bpf/loader.go                        |   2 +-
 pkg/accesslog/collector/protocols/queue.go         |   4 +-
 .../continuous/checker/bpf/network/network.go      |   3 +-
 pkg/profiling/task/network/bpf/bpf.go              |   2 +-
 pkg/profiling/task/offcpu/runner.go                |   2 +-
 pkg/tools/btf/ebpf.go                              |  41 ++++++-
 pkg/tools/btf/linker.go                            |  19 ++--
 pkg/tools/btf/queue.go                             | 122 +++++++++++++++++++++
 12 files changed, 279 insertions(+), 68 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6907c20..2300c3f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
 * Fix missing the first socket detail event in HTTPS protocol.
 * Support parallel parsing protocol data in the access log module.
 * Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
+* Introduce ringbuf queue to improve performance in the access log module.
 
 #### Bug Fixes
 * Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/syscalls/transfer.h 
b/bpf/accesslog/syscalls/transfer.h
index e1f1ba7..3f8c7e5 100644
--- a/bpf/accesslog/syscalls/transfer.h
+++ b/bpf/accesslog/syscalls/transfer.h
@@ -19,6 +19,7 @@
 #include "socket_opts.h"
 #include "socket_data.h"
 #include "socket_reader.h"
+#include "queue.h"
 #include "protocol_analyzer.h"
 #include "../common/connection.h"
 #include "../common/data_args.h"
@@ -68,15 +69,7 @@ struct socket_detail_t {
     __u8 ssl;
 };
 
-struct {
-    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-    __type(key, __u32);
-    __type(value, struct socket_detail_t);
-    __uint(max_entries, 1);
-} socket_detail_event_per_cpu_map SEC(".maps");
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_detail_data_queue SEC(".maps");
+DATA_QUEUE(socket_detail, 1024 * 1024);
 
 static __always_inline void process_write_data(void *ctx, __u64 id, struct 
sock_data_args_t *args, ssize_t bytes_count,
                                         __u32 data_direction, const bool vecs, 
__u8 func_name, bool ssl) {
@@ -144,8 +137,8 @@ static __always_inline void process_write_data(void *ctx, 
__u64 id, struct sock_
     // 1. when the SSL connection sends SSL(unencrypted) message
     // 2. when the not SSL connection sends plain data
     if (conn->ssl == ssl) {
-        __u32 kZero = 0;
-        struct socket_detail_t *detail = 
bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero);
+        struct socket_detail_t *detail;
+        detail = rover_reserve_buf(&socket_detail, sizeof(*detail));
         if (detail != NULL) {
             detail->connection_id = conid;
             detail->random_id = conn->random_id;
@@ -177,7 +170,7 @@ static __always_inline void process_write_data(void *ctx, 
__u64 id, struct sock_
             detail->l2_enter_queue_count = args->l2_enter_queue_count;
             detail->l4_package_rcv_from_queue_time = 
args->total_package_receive_from_queue_time;
 
-            bpf_perf_event_output(ctx, &socket_detail_data_queue, 
BPF_F_CURRENT_CPU, detail, sizeof(*detail));
+            rover_submit_buf(ctx, &socket_detail, detail, sizeof(*detail));
         }
     }
 
diff --git a/bpf/include/queue.h b/bpf/include/queue.h
new file mode 100644
index 0000000..c6b532c
--- /dev/null
+++ b/bpf/include/queue.h
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "api.h"
+
+#define DATA_QUEUE(name, size)               \
+       struct {                                    \
+               __uint(type, BPF_MAP_TYPE_RINGBUF); \
+               __uint(max_entries, size);          \
+       } name SEC(".maps");                        \
+       const void *rover_data_queue_##name __attribute__((unused));
+
+struct {
+       __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+       __uint(max_entries, 1);
+       __uint(key_size, sizeof(__u32));
+       __uint(value_size, 10240); // all events are less than 10KB
+} rover_data_heap SEC(".maps");
+
+static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
+       static const int zero = 0;
+
+       if (bpf_core_enum_value_exists(enum bpf_func_id,
+                                      BPF_FUNC_ringbuf_reserve))
+               return bpf_ringbuf_reserve(map, size, 0);
+
+       return bpf_map_lookup_elem(&rover_data_heap, &zero);
+}
+
+static __always_inline void rover_discard_buf(void *buf)
+{
+       if (bpf_core_enum_value_exists(enum bpf_func_id,
+                                      BPF_FUNC_ringbuf_discard))
+               bpf_ringbuf_discard(buf, 0);
+}
+
+static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, 
__u64 size) {
+       if (bpf_core_enum_value_exists(enum bpf_func_id,
+                                      BPF_FUNC_ringbuf_submit)) {
+               bpf_ringbuf_submit(buf, 0);
+               return 0;
+       }
+
+       return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
+}
\ No newline at end of file
diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h
index cae2622..0d62ccb 100644
--- a/bpf/include/socket_data.h
+++ b/bpf/include/socket_data.h
@@ -19,6 +19,7 @@
 
 #include "socket_opts.h"
 #include "protocol_analyzer.h"
+#include "queue.h"
 
 #define SOCKET_UPLOAD_CHUNK_LIMIT 12
 
@@ -43,9 +44,7 @@ struct {
     __type(value, struct socket_data_upload_event);
     __uint(max_entries, 1);
 } socket_data_upload_event_per_cpu_map SEC(".maps");
-struct {
-       __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_data_upload_event_queue SEC(".maps");
+DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
 
 struct socket_data_sequence_t {
     __u64 data_id;
@@ -106,23 +105,40 @@ static __always_inline struct upload_data_args* 
generate_socket_upload_args() {
     return bpf_map_lookup_elem(&socket_data_upload_args_per_cpu_map, &kZero);
 }
 
-static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, 
struct socket_data_upload_event *event) {
-    if (size <= 0) {
+static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, 
struct upload_data_args *args) {
+    struct socket_data_upload_event *socket_data_event;
+    socket_data_event = rover_reserve_buf(&socket_data_upload_queue, 
sizeof(*socket_data_event));
+    if (socket_data_event == NULL) {
         return;
     }
-    if (size > sizeof(event->buffer)) {
-        size = sizeof(event->buffer);
+
+    if (size > sizeof(socket_data_event->buffer)) {
+        size = sizeof(socket_data_event->buffer);
+    }
+    if (size <= 0) {
+        rover_discard_buf(socket_data_event);
+        return;
     }
-    event->sequence = index;
-    event->data_len = size;
-    event->finished = is_finished;
-    event->have_reduce_after_chunk = have_reduce_after_chunk;
-    bpf_probe_read(&event->buffer, size, buf);
 
-    bpf_perf_event_output(ctx, &socket_data_upload_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    // basic data
+    socket_data_event->start_time = args->start_time;
+    socket_data_event->end_time = args->end_time;
+    socket_data_event->protocol = args->connection_protocol;
+    socket_data_event->direction = args->data_direction;
+    socket_data_event->conid = args->con_id;
+    socket_data_event->randomid = args->random_id;
+    socket_data_event->total_size = args->bytes_count;
+    socket_data_event->data_id = args->socket_data_id;
+
+    socket_data_event->sequence = index;
+    socket_data_event->data_len = size;
+    socket_data_event->finished = is_finished;
+    socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
+    bpf_probe_read(&socket_data_event->buffer, size, buf);
+    rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, 
sizeof(*socket_data_event));
 }
 
-static __always_inline void upload_socket_data_buf(void *ctx, char* buf, 
ssize_t size, struct socket_data_upload_event *event, __u8 force_unfinished) {
+static __always_inline void upload_socket_data_buf(void *ctx, char* buf, 
ssize_t size, struct upload_data_args *args, __u8 force_unfinished) {
     ssize_t already_send = 0;
 #pragma unroll
     for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) {
@@ -141,9 +157,9 @@ static __always_inline void upload_socket_data_buf(void 
*ctx, char* buf, ssize_t
         __u8 sequence = index;
         if (force_unfinished == 1 && need_send_in_chunk > 0) {
             is_finished = 0;
-            sequence = generate_socket_sequence(event->conid, event->data_id);
+            sequence = generate_socket_sequence(args->con_id, 
args->socket_data_id);
         }
-        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, 
need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
+        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, 
need_send_in_chunk, is_finished, have_reduce_after_chunk, args);
         already_send += need_send_in_chunk;
 
     }
@@ -170,12 +186,12 @@ if (iov_index < iovlen) {                                 
                  \
         have_reduce_after_chunk = 1;                                        \
     }                                                                       \
     __u32 is_finished = (need_send_in_chunk + already_send) >= size || 
loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;                   
         \
-    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + 
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, 
event);      \
+    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + 
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, 
args);      \
     already_send += need_send_in_chunk;                                        
                                                      \
     loop_count++;                                                              
                                                      \
 }
 
-static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* 
iov, const size_t iovlen, ssize_t size, struct socket_data_upload_event *event) 
{
+static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* 
iov, const size_t iovlen, ssize_t size, struct upload_data_args *args) {
     ssize_t already_send = 0;
     ssize_t cur_iov_sended = 0;
     __u8 iov_index = 0;
@@ -198,26 +214,9 @@ static __inline void upload_socket_data(void *ctx, struct 
upload_data_args *args
     if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || 
args->connection_ssl != args->socket_data_ssl || 
args->connection_skip_data_upload == 1) {
         return;
     }
-    // generate event
-    __u32 kZero = 0;
-    struct socket_data_upload_event *event = 
bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero);
-    if (event == NULL) {
-        return;
-    }
-
-    // basic data
-    event->start_time = args->start_time;
-    event->end_time = args->end_time;
-    event->protocol = args->connection_protocol;
-    event->direction = args->data_direction;
-    event->conid = args->con_id;
-    event->randomid = args->random_id;
-    event->total_size = args->bytes_count;
-    event->data_id = args->socket_data_id;
-
     if (args->socket_data_buf != NULL) {
-        upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, 
event, args->socket_ssl_buffer_force_unfinished);
+        upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, 
args, args->socket_ssl_buffer_force_unfinished);
     } else if (args->socket_data_iovec != NULL) {
-        upload_socket_data_iov(ctx, args->socket_data_iovec, 
args->socket_data_iovlen, args->bytes_count, event);
+        upload_socket_data_iov(ctx, args->socket_data_iovec, 
args->socket_data_iovlen, args->bytes_count, args);
     }
 }
\ No newline at end of file
diff --git a/pkg/accesslog/bpf/loader.go b/pkg/accesslog/bpf/loader.go
index 5a4f827..5058913 100644
--- a/pkg/accesslog/bpf/loader.go
+++ b/pkg/accesslog/bpf/loader.go
@@ -34,7 +34,7 @@ type Loader struct {
 
 func NewLoader() (*Loader, error) {
        objs := bpfObjects{}
-       if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil {
                return nil, err
        }
 
diff --git a/pkg/accesslog/collector/protocols/queue.go 
b/pkg/accesslog/collector/protocols/queue.go
index 68708fc..7e93dce 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -92,13 +92,13 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
                func(num int) btf.PartitionContext {
                        return NewPartitionContext(q.context, num, 
q.supportAnalyzers(q.context))
                })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, 
int(q.perCPUBuffer),
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetail, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return q.detailSupplier()
                }, func(data interface{}) string {
                        return fmt.Sprintf("%d", 
data.(events.SocketDetail).GetConnectionID())
                })
-       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, 
int(q.perCPUBuffer),
+       q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, 
int(q.perCPUBuffer),
                q.context.Config.ProtocolAnalyze.ParseParallels, func() 
interface{} {
                        return &events.SocketDataUploadEvent{}
                }, func(data interface{}) string {
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go 
b/pkg/profiling/continuous/checker/bpf/network/network.go
index ba03f21..67bb39b 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -134,9 +134,10 @@ func startBPFIfNeed() error {
        }
 
        bpf = &bpfObjects{}
-       if err := loadBpfObjects(bpf, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       if err := btf.LoadBPFAndAssign(loadBpf, &bpf); err != nil {
                return err
        }
+
        bpfLinker = btf.NewLinker()
        bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_sendmsg": 
bpf.TcpSendmsg})
        bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_recvmsg": 
bpf.TcpRecvmsg})
diff --git a/pkg/profiling/task/network/bpf/bpf.go 
b/pkg/profiling/task/network/bpf/bpf.go
index 083935c..67a875f 100644
--- a/pkg/profiling/task/network/bpf/bpf.go
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -34,7 +34,7 @@ type Loader struct {
 
 func NewLoader() (*Loader, error) {
        objs := bpfObjects{}
-       if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); 
err != nil {
+       if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil {
                return nil, err
        }
 
diff --git a/pkg/profiling/task/offcpu/runner.go 
b/pkg/profiling/task/offcpu/runner.go
index 492d996..8ce3ee6 100644
--- a/pkg/profiling/task/offcpu/runner.go
+++ b/pkg/profiling/task/offcpu/runner.go
@@ -113,7 +113,7 @@ func (r *Runner) Run(ctx context.Context, notify 
base.ProfilingRunningSuccessNot
        if !replacedPid {
                return fmt.Errorf("replace the monitor pid failure")
        }
-       if err1 := spec.LoadAndAssign(&objs, 
btf.GetEBPFCollectionOptionsIfNeed()); err1 != nil {
+       if err1 := spec.LoadAndAssign(&objs, 
btf.GetEBPFCollectionOptionsIfNeed(spec)); err1 != nil {
                return err1
        }
        r.bpf = &objs
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 71e20f3..307f29f 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -20,6 +20,7 @@ package btf
 import (
        "bytes"
        "embed"
+       "errors"
        "fmt"
        "path/filepath"
        "sync"
@@ -41,7 +42,16 @@ var (
        log = logger.GetLogger("tools", "btf")
 )
 
-func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions {
+func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs 
interface{}) error {
+       bpf, err := loadBPF()
+       if err != nil {
+               return err
+       }
+
+       return bpf.LoadAndAssign(objs, GetEBPFCollectionOptionsIfNeed(bpf))
+}
+
+func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) 
*ebpf.CollectionOptions {
        findBTFOnce.Do(func() {
                readSpec, err := getKernelBTFAddress()
                if err != nil {
@@ -52,6 +62,7 @@ func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions 
{
                spec = readSpec
        })
 
+       enhanceDataQueueOpts(bpfSpec)
        return &ebpf.CollectionOptions{Programs: 
ebpf.ProgramOptions{KernelTypes: spec}}
 }
 
@@ -87,3 +98,31 @@ func getKernelBTFAddress() (spec *btf.Spec, err error) {
 func asset(file string) ([]byte, error) {
        return assets.ReadFile(filepath.ToSlash(file))
 }
+
+func validateGlobalConstVoidPtrVar(t btf.Type) error {
+       btfVar, ok := t.(*btf.Var)
+       if !ok {
+               return errors.New("not of type btf.Var")
+       }
+
+       if btfVar.Linkage != btf.GlobalVar {
+               return fmt.Errorf("%q is not a global variable", btfVar.Name)
+       }
+
+       btfPtr, ok := btfVar.Type.(*btf.Pointer)
+       if !ok {
+               return fmt.Errorf("%q is not a pointer", btfVar.Name)
+       }
+
+       btfConst, ok := btfPtr.Target.(*btf.Const)
+       if !ok {
+               return fmt.Errorf("%q is not const", btfVar.Name)
+       }
+
+       _, ok = btfConst.Type.(*btf.Void)
+       if !ok {
+               return fmt.Errorf("%q is not a const void pointer", btfVar.Name)
+       }
+
+       return nil
+}
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index f663737..9755d22 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -161,7 +161,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader 
RingBufferReader, data
 
 func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader 
RingBufferReader, perCPUBuffer,
        parallels int, dataSupplier func() interface{}) {
-       rd, err := perf.NewReader(emap, perCPUBuffer)
+       rd, err := newQueueReader(emap, perCPUBuffer)
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open ring 
buffer error: %v", err))
                return
@@ -177,10 +177,10 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap 
*ebpf.Map, bufReader RingBuff
        }
 }
 
-func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier 
func() interface{}, bufReader RingBufferReader) {
        go func() {
                for {
-                       record, err := rd.Read()
+                       sample, err := rd.Read()
                        if err != nil {
                                if errors.Is(err, perf.ErrClosed) {
                                        return
@@ -189,22 +189,17 @@ func (m *Linker) asyncReadEvent(rd *perf.Reader, emap 
*ebpf.Map, dataSupplier fu
                                continue
                        }
 
-                       if record.LostSamples != 0 {
-                               log.Warnf("perf event queue(%s) full, dropped 
%d samples", emap.String(), record.LostSamples)
-                               continue
-                       }
-
                        data := dataSupplier()
                        if r, ok := data.(reader.EventReader); ok {
-                               sampleReader := 
reader.NewReader(record.RawSample)
+                               sampleReader := reader.NewReader(sample)
                                r.ReadFrom(sampleReader)
                                if readErr := sampleReader.HasError(); readErr 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
                                        continue
                                }
                        } else {
-                               if err := 
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err 
!= nil {
-                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+                               if err := binary.Read(bytes.NewBuffer(sample), 
binary.LittleEndian, data); err != nil {
+                                       log.Warnf("parsing data from %s, raw 
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
                                        continue
                                }
                        }
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 813979d..ca7c330 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,12 +19,134 @@ package btf
 
 import (
        "context"
+       "fmt"
        "hash/fnv"
+       "os"
+       "strings"
        "sync"
 
        "github.com/cilium/ebpf"
+       "github.com/cilium/ebpf/perf"
+       "github.com/cilium/ebpf/ringbuf"
 )
 
+const dataQueuePrefix = "rover_data_queue_"
+
+var (
+       ringbufChecker   sync.Once
+       ringbufAvailable bool
+)
+
+func isRingbufAvailable() bool {
+       ringbufChecker.Do(func() {
+               ringbuf, err := ebpf.NewMap(&ebpf.MapSpec{
+                       Type:       ebpf.RingBuf,
+                       MaxEntries: uint32(os.Getpagesize()),
+               })
+
+               ringbuf.Close()
+
+               ringbufAvailable = err == nil
+
+               if ringbufAvailable {
+                       log.Infof("detect the ring buffer is available in 
current system for enhancement of data queue")
+               }
+       })
+
+       return ringbufAvailable
+}
+
+func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) {
+       it := bpfSpec.Types.Iterate()
+       for it.Next() {
+               if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) {
+                       continue
+               }
+               if err := validateGlobalConstVoidPtrVar(it.Type); err != nil {
+                       panic(fmt.Errorf("invalid global const void ptr var %s: 
%v", it.Type.TypeName(), err))
+               }
+
+               // if the ringbuf not available, use perf event array
+               if !isRingbufAvailable() {
+                       mapName := strings.TrimPrefix(it.Type.TypeName(), 
dataQueuePrefix)
+                       mapSpec := bpfSpec.Maps[mapName]
+                       mapSpec.Type = ebpf.PerfEventArray
+                       mapSpec.KeySize = 4
+                       mapSpec.ValueSize = 4
+               }
+       }
+}
+
+type queueReader interface {
+       Read() ([]byte, error)
+       Close() error
+}
+
+func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
+       switch emap.Type() {
+       case ebpf.RingBuf:
+               return newRingBufReader(emap)
+       case ebpf.PerfEventArray:
+               return newPerfQueueReader(emap, perCPUBuffer)
+       }
+       return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
+}
+
+type perfQueueReader struct {
+       name   string
+       reader *perf.Reader
+}
+
+func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, 
error) {
+       reader, err := perf.NewReader(emap, perCPUBuffer)
+       if err != nil {
+               return nil, err
+       }
+       return &perfQueueReader{reader: reader, name: emap.String()}, nil
+}
+
+func (p *perfQueueReader) Read() ([]byte, error) {
+       read, err := p.reader.Read()
+       if err != nil {
+               return nil, err
+       }
+
+       if read.LostSamples != 0 {
+               log.Warnf("perf event queue(%s) full, dropped %d samples", 
p.name, read.LostSamples)
+               return nil, nil
+       }
+
+       return read.RawSample, nil
+}
+
+func (p *perfQueueReader) Close() error {
+       return p.reader.Close()
+}
+
+type ringBufReader struct {
+       reader *ringbuf.Reader
+}
+
+func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
+       reader, err := ringbuf.NewReader(emap)
+       if err != nil {
+               return nil, err
+       }
+       return &ringBufReader{reader: reader}, nil
+}
+
+func (r *ringBufReader) Read() ([]byte, error) {
+       read, err := r.reader.Read()
+       if err != nil {
+               return nil, err
+       }
+       return read.RawSample, nil
+}
+
+func (r *ringBufReader) Close() error {
+       return r.reader.Close()
+}
+
 type PartitionContext interface {
        Start(ctx context.Context)
        Consume(data interface{})

Reply via email to