This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 023d6c2 Introduce ringbuf queue in access log module (#170)
023d6c2 is described below
commit 023d6c2f0588016b1ed6e9046fc5d27a4985728d
Author: mrproliu <[email protected]>
AuthorDate: Mon Dec 16 15:36:22 2024 +0800
Introduce ringbuf queue in access log module (#170)
---
.github/workflows/rover.yaml | 2 +-
CHANGES.md | 1 +
bpf/accesslog/common/connection.h | 63 +++++------
bpf/accesslog/syscalls/transfer.h | 17 +--
bpf/include/queue.h | 61 +++++++++++
bpf/include/socket_data.h | 73 ++++++------
pkg/accesslog/bpf/loader.go | 2 +-
pkg/accesslog/collector/protocols/queue.go | 4 +-
.../continuous/checker/bpf/network/network.go | 3 +-
.../task/network/analyze/layer7/events.go | 2 +-
.../layer7/protocols/http1/reader/reader.go | 12 +-
pkg/profiling/task/network/bpf/bpf.go | 2 +-
pkg/profiling/task/offcpu/runner.go | 2 +-
pkg/tools/btf/ebpf.go | 41 ++++++-
pkg/tools/btf/linker.go | 18 ++-
pkg/tools/btf/queue.go | 122 +++++++++++++++++++++
pkg/tools/buffer/buffer.go | 6 +-
17 files changed, 324 insertions(+), 107 deletions(-)
diff --git a/.github/workflows/rover.yaml b/.github/workflows/rover.yaml
index a72e3bf..f33567e 100644
--- a/.github/workflows/rover.yaml
+++ b/.github/workflows/rover.yaml
@@ -342,7 +342,7 @@ jobs:
if: ${{ failure() }}
name: Upload Logs
with:
- name: logs
+ name: logs ${{ matrix.test.name }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"
required:
diff --git a/CHANGES.md b/CHANGES.md
index ec2109d..a27176d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -16,6 +16,7 @@ Release Notes.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
* Reduce missing details issue in the access log module.
+* Introduce ringbuf queue to improve performance in the access log module.
#### Bug Fixes
* Fix the base image cannot run in the arm64.
diff --git a/bpf/accesslog/common/connection.h
b/bpf/accesslog/common/connection.h
index 31e9f78..cb799e8 100644
--- a/bpf/accesslog/common/connection.h
+++ b/bpf/accesslog/common/connection.h
@@ -21,6 +21,7 @@
#include "socket.h"
#include "data_args.h"
#include "socket_opts.h"
+#include "queue.h"
// syscall:connect
struct connect_args_t {
@@ -106,19 +107,7 @@ struct socket_connect_event_t {
__u64 conntrack_upstream_iph;
__u32 conntrack_upstream_port;
};
-struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_connection_event_queue SEC(".maps");
-struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, __u32);
- __type(value, struct socket_connect_event_t);
- __uint(max_entries, 1);
-} socket_connect_event_per_cpu_map SEC(".maps");
-static __inline struct socket_connect_event_t* create_socket_connect_event() {
- __u32 kZero = 0;
- return bpf_map_lookup_elem(&socket_connect_event_per_cpu_map, &kZero);
-}
+DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
// active connection cached into the hashmap
// if connection closed, then deleted
@@ -170,9 +159,7 @@ struct socket_close_event_t {
// close success
__u32 success;
};
-struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_close_event_queue SEC(".maps");
+DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ?
false : true;
@@ -182,7 +169,8 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
struct sockaddr* addr, const
struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
// send to the user-space the connection event
__u64 curr_nacs = bpf_ktime_get_ns();
- struct socket_connect_event_t *event = create_socket_connect_event();
+ struct socket_connect_event_t *event;
+ event = rover_reserve_buf(&socket_connection_event_queue, sizeof(*event));
if (event == NULL) {
return;
}
@@ -195,12 +183,11 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
event->end_time = curr_nacs;
event->func_name = func_name;
if (func_name == SOCKET_OPTS_TYPE_CONNECT) {
- event->role = CONNECTION_ROLE_TYPE_CLIENT;
+ role = CONNECTION_ROLE_TYPE_CLIENT;
} else if (func_name == SOCKET_OPTS_TYPE_ACCEPT) {
- event->role = CONNECTION_ROLE_TYPE_SERVER;
- } else {
- event->role = role;
+ role = CONNECTION_ROLE_TYPE_SERVER;
}
+ event->role = role;
event->pid = tgid;
event->sockfd = fd;
@@ -216,6 +203,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
event->success = success;
__u16 port;
+ __u8 socket_family;
event->local_port = 0;
event->remote_port = 0;
if (socket == NULL) {
@@ -236,6 +224,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
short unsigned int skc_family;
BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
event->socket_family = skc_family;
+ socket_family = skc_family;
if (event->socket_family == AF_INET) {
BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
@@ -254,6 +243,7 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
}
} else if (addr != NULL) {
event->socket_family = _(addr->sa_family);
+ socket_family = event->socket_family;
if (event->socket_family == AF_INET) {
struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
bpf_probe_read(&event->remote_addr_v4,
sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
@@ -270,9 +260,10 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
}
} else {
event->socket_family = AF_UNKNOWN;
+ socket_family = AF_UNKNOWN;
}
- bpf_perf_event_output(ctx, &socket_connection_event_queue,
BPF_F_CURRENT_CPU, event, sizeof(*event));
+ rover_submit_buf(ctx, &socket_connection_event_queue, event,
sizeof(*event));
if (success == false) {
return;
}
@@ -280,11 +271,11 @@ static __always_inline void submit_new_connection(void*
ctx, bool success, __u32
// if connect success, then add the activate connection into the kernel
// active connection save
struct active_connection_t con = {};
- con.random_id = event->random_id;
+ con.random_id = random_id;
con.pid = tgid;
con.sockfd = fd;
- con.role = event->role;
- con.socket_family = event->socket_family;
+ con.role = role;
+ con.socket_family = socket_family;
bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
}
@@ -312,17 +303,21 @@ static __inline void
submit_connection_when_not_exists(void *ctx, __u64 id, stru
}
static __inline void notify_close_connection(void* ctx, __u64 conid, struct
active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
- struct socket_close_event_t close_event = {};
+ struct socket_close_event_t *close_event;
+ close_event = rover_reserve_buf(&socket_close_event_queue,
sizeof(*close_event));
+ if (close_event == NULL) {
+ return;
+ }
- close_event.conid = conid;
- close_event.random_id = con->random_id;
- close_event.start_time = start_time;
- close_event.end_time = end_time;
- close_event.pid = con->pid;
- close_event.sockfd = con->sockfd;
- close_event.success = ret > 0 ? true : false;
+ close_event->conid = conid;
+ close_event->random_id = con->random_id;
+ close_event->start_time = start_time;
+ close_event->end_time = end_time;
+ close_event->pid = con->pid;
+ close_event->sockfd = con->sockfd;
+ close_event->success = ret > 0 ? true : false;
- bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU,
&close_event, sizeof(close_event));
+ rover_submit_buf(ctx, &socket_close_event_queue, close_event,
sizeof(*close_event));
}
static __inline void submit_close_connection(void* ctx, __u32 tgid, __u32 fd,
__u64 start_nacs, int ret) {
diff --git a/bpf/accesslog/syscalls/transfer.h
b/bpf/accesslog/syscalls/transfer.h
index e1f1ba7..053ce01 100644
--- a/bpf/accesslog/syscalls/transfer.h
+++ b/bpf/accesslog/syscalls/transfer.h
@@ -19,6 +19,7 @@
#include "socket_opts.h"
#include "socket_data.h"
#include "socket_reader.h"
+#include "queue.h"
#include "protocol_analyzer.h"
#include "../common/connection.h"
#include "../common/data_args.h"
@@ -68,15 +69,7 @@ struct socket_detail_t {
__u8 ssl;
};
-struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, __u32);
- __type(value, struct socket_detail_t);
- __uint(max_entries, 1);
-} socket_detail_event_per_cpu_map SEC(".maps");
-struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_detail_data_queue SEC(".maps");
+DATA_QUEUE(socket_detail_queue, 1024 * 1024);
static __always_inline void process_write_data(void *ctx, __u64 id, struct
sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs,
__u8 func_name, bool ssl) {
@@ -144,8 +137,8 @@ static __always_inline void process_write_data(void *ctx,
__u64 id, struct sock_
// 1. when the SSL connection sends SSL(unencrypted) message
// 2. when the not SSL connection sends plain data
if (conn->ssl == ssl) {
- __u32 kZero = 0;
- struct socket_detail_t *detail =
bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero);
+ struct socket_detail_t *detail;
+ detail = rover_reserve_buf(&socket_detail_queue, sizeof(*detail));
if (detail != NULL) {
detail->connection_id = conid;
detail->random_id = conn->random_id;
@@ -177,7 +170,7 @@ static __always_inline void process_write_data(void *ctx,
__u64 id, struct sock_
detail->l2_enter_queue_count = args->l2_enter_queue_count;
detail->l4_package_rcv_from_queue_time =
args->total_package_receive_from_queue_time;
- bpf_perf_event_output(ctx, &socket_detail_data_queue,
BPF_F_CURRENT_CPU, detail, sizeof(*detail));
+ rover_submit_buf(ctx, &socket_detail_queue, detail,
sizeof(*detail));
}
}
diff --git a/bpf/include/queue.h b/bpf/include/queue.h
new file mode 100644
index 0000000..c6b532c
--- /dev/null
+++ b/bpf/include/queue.h
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "api.h"
+
+#define DATA_QUEUE(name, size) \
+ struct { \
+ __uint(type, BPF_MAP_TYPE_RINGBUF); \
+ __uint(max_entries, size); \
+ } name SEC(".maps"); \
+ const void *rover_data_queue_##name __attribute__((unused));
+
+struct {
+ __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+ __uint(max_entries, 1);
+ __uint(key_size, sizeof(__u32));
+ __uint(value_size, 10240); // all events are less than 10KB
+} rover_data_heap SEC(".maps");
+
+static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
+ static const int zero = 0;
+
+ if (bpf_core_enum_value_exists(enum bpf_func_id,
+ BPF_FUNC_ringbuf_reserve))
+ return bpf_ringbuf_reserve(map, size, 0);
+
+ return bpf_map_lookup_elem(&rover_data_heap, &zero);
+}
+
+static __always_inline void rover_discard_buf(void *buf)
+{
+ if (bpf_core_enum_value_exists(enum bpf_func_id,
+ BPF_FUNC_ringbuf_discard))
+ bpf_ringbuf_discard(buf, 0);
+}
+
+static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf,
__u64 size) {
+ if (bpf_core_enum_value_exists(enum bpf_func_id,
+ BPF_FUNC_ringbuf_submit)) {
+ bpf_ringbuf_submit(buf, 0);
+ return 0;
+ }
+
+ return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
+}
\ No newline at end of file
diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h
index cae2622..0d62ccb 100644
--- a/bpf/include/socket_data.h
+++ b/bpf/include/socket_data.h
@@ -19,6 +19,7 @@
#include "socket_opts.h"
#include "protocol_analyzer.h"
+#include "queue.h"
#define SOCKET_UPLOAD_CHUNK_LIMIT 12
@@ -43,9 +44,7 @@ struct {
__type(value, struct socket_data_upload_event);
__uint(max_entries, 1);
} socket_data_upload_event_per_cpu_map SEC(".maps");
-struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-} socket_data_upload_event_queue SEC(".maps");
+DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
struct socket_data_sequence_t {
__u64 data_id;
@@ -106,23 +105,40 @@ static __always_inline struct upload_data_args*
generate_socket_upload_args() {
return bpf_map_lookup_elem(&socket_data_upload_args_per_cpu_map, &kZero);
}
-static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk,
struct socket_data_upload_event *event) {
- if (size <= 0) {
+static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8
index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk,
struct upload_data_args *args) {
+ struct socket_data_upload_event *socket_data_event;
+ socket_data_event = rover_reserve_buf(&socket_data_upload_queue,
sizeof(*socket_data_event));
+ if (socket_data_event == NULL) {
return;
}
- if (size > sizeof(event->buffer)) {
- size = sizeof(event->buffer);
+
+ if (size > sizeof(socket_data_event->buffer)) {
+ size = sizeof(socket_data_event->buffer);
+ }
+ if (size <= 0) {
+ rover_discard_buf(socket_data_event);
+ return;
}
- event->sequence = index;
- event->data_len = size;
- event->finished = is_finished;
- event->have_reduce_after_chunk = have_reduce_after_chunk;
- bpf_probe_read(&event->buffer, size, buf);
- bpf_perf_event_output(ctx, &socket_data_upload_event_queue,
BPF_F_CURRENT_CPU, event, sizeof(*event));
+ // basic data
+ socket_data_event->start_time = args->start_time;
+ socket_data_event->end_time = args->end_time;
+ socket_data_event->protocol = args->connection_protocol;
+ socket_data_event->direction = args->data_direction;
+ socket_data_event->conid = args->con_id;
+ socket_data_event->randomid = args->random_id;
+ socket_data_event->total_size = args->bytes_count;
+ socket_data_event->data_id = args->socket_data_id;
+
+ socket_data_event->sequence = index;
+ socket_data_event->data_len = size;
+ socket_data_event->finished = is_finished;
+ socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
+ bpf_probe_read(&socket_data_event->buffer, size, buf);
+ rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event,
sizeof(*socket_data_event));
}
-static __always_inline void upload_socket_data_buf(void *ctx, char* buf,
ssize_t size, struct socket_data_upload_event *event, __u8 force_unfinished) {
+static __always_inline void upload_socket_data_buf(void *ctx, char* buf,
ssize_t size, struct upload_data_args *args, __u8 force_unfinished) {
ssize_t already_send = 0;
#pragma unroll
for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) {
@@ -141,9 +157,9 @@ static __always_inline void upload_socket_data_buf(void
*ctx, char* buf, ssize_t
__u8 sequence = index;
if (force_unfinished == 1 && need_send_in_chunk > 0) {
is_finished = 0;
- sequence = generate_socket_sequence(event->conid, event->data_id);
+ sequence = generate_socket_sequence(args->con_id,
args->socket_data_id);
}
- __upload_socket_data_with_buffer(ctx, sequence, buf + already_send,
need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
+ __upload_socket_data_with_buffer(ctx, sequence, buf + already_send,
need_send_in_chunk, is_finished, have_reduce_after_chunk, args);
already_send += need_send_in_chunk;
}
@@ -170,12 +186,12 @@ if (iov_index < iovlen) {
\
have_reduce_after_chunk = 1; \
} \
__u32 is_finished = (need_send_in_chunk + already_send) >= size ||
loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;
\
- __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base +
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk,
event); \
+ __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base +
cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk,
args); \
already_send += need_send_in_chunk;
\
loop_count++;
\
}
-static __always_inline void upload_socket_data_iov(void *ctx, struct iovec*
iov, const size_t iovlen, ssize_t size, struct socket_data_upload_event *event)
{
+static __always_inline void upload_socket_data_iov(void *ctx, struct iovec*
iov, const size_t iovlen, ssize_t size, struct upload_data_args *args) {
ssize_t already_send = 0;
ssize_t cur_iov_sended = 0;
__u8 iov_index = 0;
@@ -198,26 +214,9 @@ static __inline void upload_socket_data(void *ctx, struct
upload_data_args *args
if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN ||
args->connection_ssl != args->socket_data_ssl ||
args->connection_skip_data_upload == 1) {
return;
}
- // generate event
- __u32 kZero = 0;
- struct socket_data_upload_event *event =
bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero);
- if (event == NULL) {
- return;
- }
-
- // basic data
- event->start_time = args->start_time;
- event->end_time = args->end_time;
- event->protocol = args->connection_protocol;
- event->direction = args->data_direction;
- event->conid = args->con_id;
- event->randomid = args->random_id;
- event->total_size = args->bytes_count;
- event->data_id = args->socket_data_id;
-
if (args->socket_data_buf != NULL) {
- upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count,
event, args->socket_ssl_buffer_force_unfinished);
+ upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count,
args, args->socket_ssl_buffer_force_unfinished);
} else if (args->socket_data_iovec != NULL) {
- upload_socket_data_iov(ctx, args->socket_data_iovec,
args->socket_data_iovlen, args->bytes_count, event);
+ upload_socket_data_iov(ctx, args->socket_data_iovec,
args->socket_data_iovlen, args->bytes_count, args);
}
}
\ No newline at end of file
diff --git a/pkg/accesslog/bpf/loader.go b/pkg/accesslog/bpf/loader.go
index 5a4f827..5058913 100644
--- a/pkg/accesslog/bpf/loader.go
+++ b/pkg/accesslog/bpf/loader.go
@@ -34,7 +34,7 @@ type Loader struct {
func NewLoader() (*Loader, error) {
objs := bpfObjects{}
- if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed());
err != nil {
+ if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil {
return nil, err
}
diff --git a/pkg/accesslog/collector/protocols/queue.go
b/pkg/accesslog/collector/protocols/queue.go
index 833d323..7e07eb3 100644
--- a/pkg/accesslog/collector/protocols/queue.go
+++ b/pkg/accesslog/collector/protocols/queue.go
@@ -92,13 +92,13 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num,
q.supportAnalyzers(q.context))
})
- q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue,
int(q.perCPUBuffer),
+ q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return q.detailSupplier()
}, func(data interface{}) string {
return fmt.Sprintf("%d",
data.(events.SocketDetail).GetConnectionID())
})
- q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue,
int(q.perCPUBuffer),
+ q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue,
int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func()
interface{} {
return &events.SocketDataUploadEvent{}
}, func(data interface{}) string {
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go
b/pkg/profiling/continuous/checker/bpf/network/network.go
index ba03f21..a3cb644 100644
--- a/pkg/profiling/continuous/checker/bpf/network/network.go
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -134,9 +134,10 @@ func startBPFIfNeed() error {
}
bpf = &bpfObjects{}
- if err := loadBpfObjects(bpf, btf.GetEBPFCollectionOptionsIfNeed());
err != nil {
+ if err := btf.LoadBPFAndAssign(loadBpf, bpf); err != nil {
return err
}
+
bpfLinker = btf.NewLinker()
bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_sendmsg":
bpf.TcpSendmsg})
bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_recvmsg":
bpf.TcpRecvmsg})
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index dab74d1..707aaa8 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -36,7 +36,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
// socket buffer data
-
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue,
l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDataUploadEvent{}
}, func(data interface{}) string {
return
data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index 8cc8c85..151e563 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -104,11 +104,19 @@ func (m *MessageOpt) ContentTotalSize() int {
}
func (m *MessageOpt) StartTime() uint64 {
- return m.HeaderBuffer().FirstSocketBuffer().StartTime()
+ socketBuffer := m.HeaderBuffer().FirstSocketBuffer()
+ if socketBuffer == nil {
+ return 0
+ }
+ return socketBuffer.StartTime()
}
func (m *MessageOpt) EndTime() uint64 {
- return m.BodyBuffer().LastSocketBuffer().EndTime()
+ socketBuffer := m.BodyBuffer().LastSocketBuffer()
+ if socketBuffer == nil {
+ return m.StartTime()
+ }
+ return socketBuffer.EndTime()
}
func (m *MessageOpt) Direction() enums.SocketDataDirection {
diff --git a/pkg/profiling/task/network/bpf/bpf.go
b/pkg/profiling/task/network/bpf/bpf.go
index 083935c..67a875f 100644
--- a/pkg/profiling/task/network/bpf/bpf.go
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -34,7 +34,7 @@ type Loader struct {
func NewLoader() (*Loader, error) {
objs := bpfObjects{}
- if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed());
err != nil {
+ if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil {
return nil, err
}
diff --git a/pkg/profiling/task/offcpu/runner.go
b/pkg/profiling/task/offcpu/runner.go
index 492d996..8ce3ee6 100644
--- a/pkg/profiling/task/offcpu/runner.go
+++ b/pkg/profiling/task/offcpu/runner.go
@@ -113,7 +113,7 @@ func (r *Runner) Run(ctx context.Context, notify
base.ProfilingRunningSuccessNot
if !replacedPid {
return fmt.Errorf("replace the monitor pid failure")
}
- if err1 := spec.LoadAndAssign(&objs,
btf.GetEBPFCollectionOptionsIfNeed()); err1 != nil {
+ if err1 := spec.LoadAndAssign(&objs,
btf.GetEBPFCollectionOptionsIfNeed(spec)); err1 != nil {
return err1
}
r.bpf = &objs
diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go
index 71e20f3..307f29f 100644
--- a/pkg/tools/btf/ebpf.go
+++ b/pkg/tools/btf/ebpf.go
@@ -20,6 +20,7 @@ package btf
import (
"bytes"
"embed"
+ "errors"
"fmt"
"path/filepath"
"sync"
@@ -41,7 +42,16 @@ var (
log = logger.GetLogger("tools", "btf")
)
-func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions {
+func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs
interface{}) error {
+ bpf, err := loadBPF()
+ if err != nil {
+ return err
+ }
+
+ return bpf.LoadAndAssign(objs, GetEBPFCollectionOptionsIfNeed(bpf))
+}
+
+func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec)
*ebpf.CollectionOptions {
findBTFOnce.Do(func() {
readSpec, err := getKernelBTFAddress()
if err != nil {
@@ -52,6 +62,7 @@ func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions
{
spec = readSpec
})
+ enhanceDataQueueOpts(bpfSpec)
return &ebpf.CollectionOptions{Programs:
ebpf.ProgramOptions{KernelTypes: spec}}
}
@@ -87,3 +98,31 @@ func getKernelBTFAddress() (spec *btf.Spec, err error) {
func asset(file string) ([]byte, error) {
return assets.ReadFile(filepath.ToSlash(file))
}
+
+func validateGlobalConstVoidPtrVar(t btf.Type) error {
+ btfVar, ok := t.(*btf.Var)
+ if !ok {
+ return errors.New("not of type btf.Var")
+ }
+
+ if btfVar.Linkage != btf.GlobalVar {
+ return fmt.Errorf("%q is not a global variable", btfVar.Name)
+ }
+
+ btfPtr, ok := btfVar.Type.(*btf.Pointer)
+ if !ok {
+ return fmt.Errorf("%q is not a pointer", btfVar.Name)
+ }
+
+ btfConst, ok := btfPtr.Target.(*btf.Const)
+ if !ok {
+ return fmt.Errorf("%q is not const", btfVar.Name)
+ }
+
+ _, ok = btfConst.Type.(*btf.Void)
+ if !ok {
+ return fmt.Errorf("%q is not a const void pointer", btfVar.Name)
+ }
+
+ return nil
+}
diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go
index f663737..867d84e 100644
--- a/pkg/tools/btf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -161,7 +161,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader
RingBufferReader, data
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader
RingBufferReader, perCPUBuffer,
parallels int, dataSupplier func() interface{}) {
- rd, err := perf.NewReader(emap, perCPUBuffer)
+ rd, err := newQueueReader(emap, perCPUBuffer)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring
buffer error: %v", err))
return
@@ -177,10 +177,10 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap
*ebpf.Map, bufReader RingBuff
}
}
-func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
+func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier
func() interface{}, bufReader RingBufferReader) {
go func() {
for {
- record, err := rd.Read()
+ sample, err := rd.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
return
@@ -188,23 +188,21 @@ func (m *Linker) asyncReadEvent(rd *perf.Reader, emap
*ebpf.Map, dataSupplier fu
log.Warnf("read from %s ringbuffer error: %v",
emap.String(), err)
continue
}
-
- if record.LostSamples != 0 {
- log.Warnf("perf event queue(%s) full, dropped
%d samples", emap.String(), record.LostSamples)
+ if len(sample) == 0 {
continue
}
data := dataSupplier()
if r, ok := data.(reader.EventReader); ok {
- sampleReader :=
reader.NewReader(record.RawSample)
+ sampleReader := reader.NewReader(sample)
r.ReadFrom(sampleReader)
if readErr := sampleReader.HasError(); readErr
!= nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
continue
}
} else {
- if err :=
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err
!= nil {
- log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err)
+ if err := binary.Read(bytes.NewBuffer(sample),
binary.LittleEndian, data); err != nil {
+ log.Warnf("parsing data from %s, raw
size: %d, ringbuffer error: %v", emap.String(), len(sample), err)
continue
}
}
diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go
index 813979d..290f717 100644
--- a/pkg/tools/btf/queue.go
+++ b/pkg/tools/btf/queue.go
@@ -19,12 +19,134 @@ package btf
import (
"context"
+ "fmt"
"hash/fnv"
+ "os"
+ "strings"
"sync"
"github.com/cilium/ebpf"
+ "github.com/cilium/ebpf/perf"
+ "github.com/cilium/ebpf/ringbuf"
)
+const dataQueuePrefix = "rover_data_queue_"
+
+var (
+ ringbufChecker sync.Once
+ ringbufAvailable bool
+)
+
+func isRingbufAvailable() bool {
+ ringbufChecker.Do(func() {
+ buf, err := ebpf.NewMap(&ebpf.MapSpec{
+ Type: ebpf.RingBuf,
+ MaxEntries: uint32(os.Getpagesize()),
+ })
+
+ buf.Close()
+
+ ringbufAvailable = err == nil
+
+ if ringbufAvailable {
+ log.Infof("detect the ring buffer is available in
current system for enhancement of data queue")
+ }
+ })
+
+ return ringbufAvailable
+}
+
+func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) {
+ it := bpfSpec.Types.Iterate()
+ for it.Next() {
+ if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) {
+ continue
+ }
+ if err := validateGlobalConstVoidPtrVar(it.Type); err != nil {
+ panic(fmt.Errorf("invalid global const void ptr var %s:
%v", it.Type.TypeName(), err))
+ }
+
+ // if the ringbuf not available, use perf event array
+ if !isRingbufAvailable() {
+ mapName := strings.TrimPrefix(it.Type.TypeName(),
dataQueuePrefix)
+ mapSpec := bpfSpec.Maps[mapName]
+ mapSpec.Type = ebpf.PerfEventArray
+ mapSpec.KeySize = 4
+ mapSpec.ValueSize = 4
+ }
+ }
+}
+
+type queueReader interface {
+ Read() ([]byte, error)
+ Close() error
+}
+
+func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) {
+ switch emap.Type() {
+ case ebpf.RingBuf:
+ return newRingBufReader(emap)
+ case ebpf.PerfEventArray:
+ return newPerfQueueReader(emap, perCPUBuffer)
+ }
+ return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String())
+}
+
+type perfQueueReader struct {
+ name string
+ reader *perf.Reader
+}
+
+func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader,
error) {
+ reader, err := perf.NewReader(emap, perCPUBuffer)
+ if err != nil {
+ return nil, err
+ }
+ return &perfQueueReader{reader: reader, name: emap.String()}, nil
+}
+
+func (p *perfQueueReader) Read() ([]byte, error) {
+ read, err := p.reader.Read()
+ if err != nil {
+ return nil, err
+ }
+
+ if read.LostSamples != 0 {
+ log.Warnf("perf event queue(%s) full, dropped %d samples",
p.name, read.LostSamples)
+ return nil, nil
+ }
+
+ return read.RawSample, nil
+}
+
+func (p *perfQueueReader) Close() error {
+ return p.reader.Close()
+}
+
+type ringBufReader struct {
+ reader *ringbuf.Reader
+}
+
+func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) {
+ reader, err := ringbuf.NewReader(emap)
+ if err != nil {
+ return nil, err
+ }
+ return &ringBufReader{reader: reader}, nil
+}
+
+func (r *ringBufReader) Read() ([]byte, error) {
+ read, err := r.reader.Read()
+ if err != nil {
+ return nil, err
+ }
+ return read.RawSample, nil
+}
+
+func (r *ringBufReader) Close() error {
+ return r.reader.Close()
+}
+
type PartitionContext interface {
Start(ctx context.Context)
Consume(data interface{})
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 9993f47..c16a9fd 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -367,14 +367,14 @@ func (r *Buffer) DataSize() int64 {
}
func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
- if r.dataEvents == nil || r.dataEvents.Len() == 0 {
+ if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Front().Value.(SocketDataBuffer)
}
func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
- if r.dataEvents == nil || r.dataEvents.Len() == 0 {
+ if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Back().Value.(SocketDataBuffer)
@@ -382,7 +382,7 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
// DetectNotSendingLastPosition detect the buffer contains not sending data:
the BPF limited socket data count
func (r *Buffer) DetectNotSendingLastPosition() *Position {
- if r.dataEvents.Len() == 0 {
+ if r == nil || r.dataEvents.Len() == 0 {
return nil
}