This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new f0e2244 Add the syscall level event to the trace (#74)
f0e2244 is described below
commit f0e2244fa28046e8e3b0a57a167bff81ea9a1392
Author: mrproliu <[email protected]>
AuthorDate: Thu Jan 19 14:22:28 2023 +0800
Add the syscall level event to the trace (#74)
---
CHANGES.md | 1 +
bpf/profiling/network/args.h | 4 +
bpf/profiling/network/netmonitor.c | 81 +++++++--
bpf/profiling/network/sock_stats.h | 2 +
bpf/profiling/network/socket.h | 61 ++++++-
bpf/profiling/network/socket_detail.c | 92 ++++++++++
bpf/profiling/network/socket_detail.h | 71 ++++++++
docs/en/setup/configuration/profiling.md | 9 +-
pkg/profiling/task/network/analyze/base/enums.go | 78 +++++++++
.../task/network/analyze/layer7/events.go | 29 +++-
.../analyze/layer7/protocols/base/analyzer.go | 36 ++--
.../analyze/layer7/protocols/base/buffer.go | 186 ++++++++++++++++-----
.../analyze/layer7/protocols/base/buffer_test.go | 2 +-
.../analyze/layer7/protocols/base/events.go | 17 ++
.../analyze/layer7/protocols/http1/metrics.go | 62 ++++++-
.../network/analyze/layer7/protocols/protocols.go | 10 ++
pkg/profiling/task/network/analyze/layer7/queue.go | 36 +++-
pkg/profiling/task/network/runner.go | 3 +
pkg/tools/host/network.go | 62 +++++++
19 files changed, 752 insertions(+), 90 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1a88c37..b457827 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
------------------
#### Features
* Enhance the protocol reader for support long socket data.
+* Add the syscall level event to the trace.
#### Bug Fixes
* Fix HTTP method name in protocol analyzer
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index b3c7c51..34e0bd2 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -139,7 +139,11 @@ struct sock_data_args_t {
__u64 data_id;
// for openssl
__u32 excepted_size;
+ __u16 fix;
__u8 ssl_buffer_force_unfinished;
+ __u8 package_count;
+ __u64 total_package_size;
+ __u32 ifindex;
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
diff --git a/bpf/profiling/network/netmonitor.c
b/bpf/profiling/network/netmonitor.c
index 43577ba..78d1bd2 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -36,6 +36,7 @@
#include "sock_stats.h"
#include "args.h"
#include "protocol_analyzer.h"
+#include "socket_detail.h"
char __license[] SEC("license") = "Dual MIT/GPL";
@@ -390,7 +391,7 @@ static __inline void upload_socket_data(void *ctx, __u64
start_time, __u64 end_t
}
static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id,
struct sock_data_args_t *args, ssize_t bytes_count,
- __u32 data_direction, const bool vecs,
__u32 func_name, bool ssl) {
+ __u32 data_direction, const bool vecs,
__u8 func_name, bool ssl) {
__u64 curr_nacs = bpf_ktime_get_ns();
__u32 tgid = (__u32)(id >> 32);
@@ -453,6 +454,9 @@ static __always_inline void process_write_data(struct
pt_regs *ctx, __u64 id, st
}
}
+ // upload the socket detail
+ upload_socket_detail(ctx, conid, conn, func_name, args, ssl);
+
// upload the socket data if need
upload_socket_data(ctx, args->start_nacs, curr_nacs, conid, conn, args,
bytes_count, msg_type, data_direction, ssl);
@@ -584,8 +588,12 @@ int sock_alloc_ret(struct pt_regs *ctx) {
SEC("kprobe/sendto")
int sys_sendto(struct pt_regs *ctx) {
- ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
__u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
+ ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
__u32 fd = _(PT_REGS_PARM1(ctx));
char* buf;
bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
@@ -648,10 +656,14 @@ int tcp_rcv_established(struct pt_regs* ctx) {
SEC("kprobe/write")
int sys_write(struct pt_regs *ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
char* buf;
bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
- __u64 id = bpf_get_current_pid_tgid();
struct sock_data_args_t data_args = {};
data_args.fd = _(PT_REGS_PARM1(ctx));
@@ -677,8 +689,12 @@ int sys_write_ret(struct pt_regs *ctx) {
SEC("kprobe/send")
int sys_send(struct pt_regs* ctx) {
- ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
__u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
+ ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
char* buf;
bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
@@ -706,10 +722,14 @@ int sys_send_ret(struct pt_regs* ctx) {
SEC("kprobe/writev")
int sys_writev(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
struct iovec *iovec;
bpf_probe_read(&iovec, sizeof(iovec), &(PT_REGS_PARM2(ctx)));
- __u64 id = bpf_get_current_pid_tgid();
struct sock_data_args_t data_args = {};
data_args.fd = _(PT_REGS_PARM1(ctx));
@@ -736,13 +756,17 @@ int sys_writev_ret(struct pt_regs* ctx) {
SEC("kprobe/sendmsg")
int sys_sendmsg(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
struct user_msghdr* msghdr;
bpf_probe_read(&msghdr, sizeof(msghdr), &(PT_REGS_PARM2(ctx)));
if (msghdr == NULL) {
return 0;
}
- __u64 id = bpf_get_current_pid_tgid();
__u32 fd = _(PT_REGS_PARM1(ctx));
struct sockaddr* addr = _(msghdr->msg_name);
@@ -786,6 +810,11 @@ int sys_sendmsg_ret(struct pt_regs* ctx) {
SEC("kprobe/sendmmsg")
int sys_sendmmsg(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
struct mmsghdr* mmsghdr;
bpf_probe_read(&mmsghdr, sizeof(mmsghdr), &(PT_REGS_PARM2(ctx)));
@@ -794,7 +823,6 @@ int sys_sendmmsg(struct pt_regs* ctx) {
return 0;
}
- __u64 id = bpf_get_current_pid_tgid();
__u32 fd = _(PT_REGS_PARM1(ctx));
struct sockaddr* addr = _(mmsghdr->msg_hdr.msg_name);
@@ -902,10 +930,14 @@ int sys_sendfile_ret(struct pt_regs *ctx) {
SEC("kprobe/read")
int sys_read(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
char* buf;
bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
- __u64 id = bpf_get_current_pid_tgid();
struct sock_data_args_t data_args = {};
data_args.fd = _(PT_REGS_PARM1(ctx));
@@ -930,10 +962,14 @@ int sys_read_ret(struct pt_regs* ctx) {
SEC("kprobe/readv")
int sys_readv(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
struct iovec *iovec;
bpf_probe_read(&iovec, sizeof(iovec), &(PT_REGS_PARM2(ctx)));
- __u64 id = bpf_get_current_pid_tgid();
struct sock_data_args_t data_args = {};
data_args.fd = _(PT_REGS_PARM1(ctx));
@@ -960,8 +996,12 @@ int sys_readv_ret(struct pt_regs* ctx) {
SEC("kprobe/recv")
int sys_recv(struct pt_regs* ctx) {
- ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
__u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
+ ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
char* buf;
bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
@@ -988,10 +1028,14 @@ int sys_recv_ret(struct pt_regs* ctx) {
SEC("kprobe/recvfrom")
int sys_recvfrom(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
char* buf;
bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
- __u64 id = bpf_get_current_pid_tgid();
struct sockaddr* sock;
bpf_probe_read(&sock, sizeof(sock), &(PT_REGS_PARM5(ctx)));
@@ -1035,13 +1079,17 @@ int sys_recvfrom_ret(struct pt_regs* ctx) {
SEC("kprobe/recvmsg")
int sys_recvmsg(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
struct user_msghdr* msghdr;
bpf_probe_read(&msghdr, sizeof(msghdr), &(PT_REGS_PARM2(ctx)));
if (msghdr == NULL) {
return 0;
}
- __u64 id = bpf_get_current_pid_tgid();
__u32 fd = _(PT_REGS_PARM1(ctx));
struct sockaddr* addr = _(msghdr->msg_name);
@@ -1085,6 +1133,11 @@ int sys_recvmsg_ret(struct pt_regs* ctx) {
SEC("kprobe/recvmmsg")
int sys_recvmmsg(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+ if (tgid_should_trace(tgid) == false) {
+ return 0;
+ }
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
struct mmsghdr* mmsghdr;
bpf_probe_read(&mmsghdr, sizeof(mmsghdr), &(PT_REGS_PARM2(ctx)));
@@ -1093,7 +1146,6 @@ int sys_recvmmsg(struct pt_regs* ctx) {
return 0;
}
- __u64 id = bpf_get_current_pid_tgid();
__u32 fd = _(PT_REGS_PARM1(ctx));
struct sockaddr* addr = _(mmsghdr->msg_hdr.msg_name);
@@ -1258,4 +1310,5 @@ int kfree_skb_reason(struct pt_regs *ctx) {
#include "openssl.c"
#include "go_tls.c"
-#include "node_tls.c"
\ No newline at end of file
+#include "node_tls.c"
+#include "socket_detail.c"
\ No newline at end of file
diff --git a/bpf/profiling/network/sock_stats.h
b/bpf/profiling/network/sock_stats.h
index 8312af3..604071d 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -64,6 +64,8 @@ struct active_connection_t {
__u8 connect_event_send;
// current connection is ssl
__u8 ssl;
+ __u8 fix;
+ void *last_recv_sk_buff;
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
diff --git a/bpf/profiling/network/socket.h b/bpf/profiling/network/socket.h
index 7af5ea8..ee0a294 100644
--- a/bpf/profiling/network/socket.h
+++ b/bpf/profiling/network/socket.h
@@ -17,6 +17,11 @@
#pragma once
+#define SKB_DST_NOREF 1UL
+#define SKB_DST_PTRMASK ~(SKB_DST_NOREF)
+typedef int64_t s64;
+typedef s64 ktime_t;
+
struct in6_addr_redefine {
union {
__u8 u6_addr8[16];
@@ -64,6 +69,10 @@ struct sock {
struct tcp_sock {
__u32 srtt_us;
+ __u32 copied_seq;
+ __u32 write_seq;
+ __u32 packets_out;
+ __u32 retrans_out;
} __attribute__((preserve_access_index));
struct user_msghdr {
@@ -77,8 +86,35 @@ struct mmsghdr {
unsigned int msg_len;
} __attribute__((preserve_access_index));
+struct list_head {
+ struct list_head *next;
+ struct list_head *prev;
+} __attribute__((preserve_access_index));
+struct rb_node {
+ long unsigned int __rb_parent_color;
+ struct rb_node *rb_right;
+ struct rb_node *rb_left;
+} __attribute__((preserve_access_index));
struct sk_buff {
struct sock *sk;
+ union {
+ struct {
+ long unsigned int _skb_refdst;
+ void (*destructor)(struct sk_buff *);
+ };
+ long unsigned int _sk_redir;
+ };
+ union {
+ ktime_t tstamp;
+ __u64 skb_mstamp_ns;
+ };
+ int skb_iif;
+ unsigned int len;
+ unsigned int data_len;
+} __attribute__((preserve_access_index));
+
+struct net {
+ int ifindex;
} __attribute__((preserve_access_index));
enum skb_drop_reason {
@@ -89,4 +125,27 @@ enum skb_drop_reason {
SKB_DROP_REASON_TCP_FILTER,
SKB_DROP_REASON_UDP_CSUM,
SKB_DROP_REASON_MAX,
-};
\ No newline at end of file
+};
+
+struct dst_entry {
+ struct net_device *dev;
+} __attribute__((preserve_access_index));
+
+struct net_device {
+ int ifindex;
+ unsigned long state;
+ unsigned int mtu;
+} __attribute__((preserve_access_index));
+
+struct skb_shared_info {
+ __u8 flags;
+ __u8 meta_len;
+ __u8 nr_frags;
+ __u8 tx_flags;
+ short unsigned int gso_size;
+ short unsigned int gso_segs;
+ struct sk_buff *frag_list;
+ unsigned int gso_type;
+ __u32 tskey;
+ void *destructor_arg;
+} __attribute__((preserve_access_index));
\ No newline at end of file
diff --git a/bpf/profiling/network/socket_detail.c
b/bpf/profiling/network/socket_detail.c
new file mode 100644
index 0000000..a310366
--- /dev/null
+++ b/bpf/profiling/network/socket_detail.c
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "socket_detail.h"
+#include "args.h"
+#include "sock_stats.h"
+
+SEC("kprobe/ip_finish_output")
+int ip_finish_output(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct sock_data_args_t *data_args =
bpf_map_lookup_elem(&socket_data_args, &id);
+ if (data_args == NULL) {
+ return 0;
+ }
+
+ struct sk_buff *buff = (void *)PT_REGS_PARM3(ctx);
+ long unsigned int _skb_refdst;
+ bpf_probe_read(&_skb_refdst, sizeof(_skb_refdst), &buff->_skb_refdst);
+ struct dst_entry *entry = (void *)(_skb_refdst & SKB_DST_PTRMASK);
+ struct net_device *device;
+ bpf_probe_read(&device, sizeof(device), &entry->dev);
+
+ int ifindex;
+ bpf_probe_read(&ifindex, sizeof(ifindex), &device->ifindex);
+
+ unsigned int data_len;
+ bpf_probe_read(&data_len, sizeof(data_len), &buff->len);
+
+ data_args->package_count++;
+ data_args->total_package_size += data_len;
+ data_args->ifindex = ifindex;
+
+ struct sock *sock = (void *)PT_REGS_PARM2(ctx);
+ struct tcp_sock *tcp_sock = (struct tcp_sock *)sock;
+ __u32 packets_out;
+ __u32 retrans_out;
+ __u32 srtt;
+ BPF_CORE_READ_INTO(&srtt, tcp_sock, srtt_us);
+ srtt = srtt >> 3;
+ BPF_CORE_READ_INTO(&packets_out, tcp_sock, packets_out);
+ BPF_CORE_READ_INTO(&retrans_out, tcp_sock, retrans_out);
+ return 0;
+}
+
+SEC("kprobe/skb_copy_datagram_iter")
+int skb_copy_datagram_iter(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct sock_data_args_t *data_args =
bpf_map_lookup_elem(&socket_data_args, &id);
+ if (data_args == NULL) {
+ return 0;
+ }
+ __u64 conid = gen_tgid_fd((__u32)(id >> 32), data_args->fd);
+ struct active_connection_t *con =
bpf_map_lookup_elem(&active_connection_map, &conid);
+ if (con == NULL) {
+ return 0;
+ }
+
+ const struct sk_buff *buff = (void *)PT_REGS_PARM1(ctx);
+ int len = PT_REGS_PARM4(ctx);
+ int ifindex;
+ bpf_probe_read(&ifindex, sizeof(ifindex), &buff->skb_iif);
+
+ if (con->last_recv_sk_buff == NULL || con->last_recv_sk_buff != buff ||
data_args->package_count == 0) {
+ data_args->package_count++;
+ con->last_recv_sk_buff = (void *)buff;
+ }
+ data_args->ifindex = ifindex;
+ data_args->total_package_size += len;
+
+ struct sock *sock;
+ bpf_probe_read(&sock, sizeof(sock), &buff->sk);
+ struct tcp_sock *tcp_sock = (struct tcp_sock *)sock;
+ __u32 packets_out;
+ __u32 retrans_out;
+ bpf_probe_read(&packets_out, sizeof(packets_out), &tcp_sock->packets_out);
+ bpf_probe_read(&retrans_out, sizeof(retrans_out), &tcp_sock->retrans_out);
+ return 0;
+}
diff --git a/bpf/profiling/network/socket_detail.h
b/bpf/profiling/network/socket_detail.h
new file mode 100644
index 0000000..2d24e0f
--- /dev/null
+++ b/bpf/profiling/network/socket_detail.h
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common.h"
+
+struct socket_detail_t {
+ __u64 connection_id;
+ __u64 random_id;
+ __u64 data_id;
+ __u64 total_package_size;
+ __u32 ifindex;
+ __u8 package_count;
+ __u8 func_name;
+ __u8 rtt_count;
+ __u8 protocol;
+ __u64 rtt_time;
+};
+
+struct {
+ __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+ __type(key, __u32);
+ __type(value, struct socket_detail_t);
+ __uint(max_entries, 1);
+} socket_detail_event_per_cpu_map SEC(".maps");
+struct {
+ __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+} socket_detail_data_queue SEC(".maps");
+
+static __inline void upload_socket_detail(void *ctx, __u64 conid, struct
active_connection_t *connection, __u8 func_name, struct sock_data_args_t
*data_args, bool ssl) {
+ // only send the original socket syscall(not ssl) and the protocol must
been set
+ if (ssl == true || connection->protocol == CONNECTION_PROTOCOL_UNKNOWN) {
+ return;
+ }
+ __u32 kZero = 0;
+ struct socket_detail_t *detail =
bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero);
+ if (detail == NULL) {
+ return;
+ }
+
+ detail->connection_id = conid;
+ detail->random_id = connection->random_id;
+ detail->data_id = data_args->data_id;
+ detail->func_name = func_name;
+ detail->total_package_size = data_args->total_package_size;
+ detail->ifindex = data_args->ifindex;
+ detail->package_count = data_args->package_count;
+ detail->protocol = connection->protocol;
+
+ if (data_args->rtt_count > 0) {
+ detail->rtt_count = data_args->rtt_count;
+ detail->rtt_time = data_args->rtt_time;
+ }
+
+ bpf_perf_event_output(ctx, &socket_detail_data_queue, BPF_F_CURRENT_CPU,
detail, sizeof(*detail));
+}
\ No newline at end of file
diff --git a/docs/en/setup/configuration/profiling.md
b/docs/en/setup/configuration/profiling.md
index 8a8411c..fe107ee 100644
--- a/docs/en/setup/configuration/profiling.md
+++ b/docs/en/setup/configuration/profiling.md
@@ -113,7 +113,8 @@ Based on the above two data types, the following metrics
are provided.
| status_5xx | TopN | millisecond | The Top N trace(id)s with response
status in 500-599 |
##### Span Attached Event
-| Name | Description
|
-|------------------------|-----------------------------------------------------------------------------------------------|
-| HTTP Request Sampling | Complete information about the HTTP request, it's
only reported when it matches slow traces. |
-| HTTP Response Sampling | Complete information about the HTTP response, it's
only reported when it matches slow traces. |
+| Name | Description
|
+|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------|
+| HTTP Request Sampling | Complete information about the HTTP request, it's
only reported when it matches slow/4xx/5xx traces.
|
+| HTTP Response Sampling | Complete information about the HTTP response, it's
only reported when it matches slow/4xx/5xx traces.
|
+| Syscall xxx | The methods to use when the process invoke with the
network-related syscall method. It's only reported when it matches slow/4xx/5xx
traces. |
diff --git a/pkg/profiling/task/network/analyze/base/enums.go
b/pkg/profiling/task/network/analyze/base/enums.go
index 0cda9d7..ad4aa60 100644
--- a/pkg/profiling/task/network/analyze/base/enums.go
+++ b/pkg/profiling/task/network/analyze/base/enums.go
@@ -17,6 +17,8 @@
package base
+import "fmt"
+
const (
unknown = "unknown"
http = "http"
@@ -134,3 +136,79 @@ func (m SocketMessageType) String() string {
return unknown
}
}
+
+type SocketFunctionName uint8
+
+const (
+ SocketFunctionNameConnect = 1
+ SocketFunctionNameAccept = 2
+ SocketFunctionNameClose = 3
+ SocketFunctionNameSend = 4
+ SocketFunctionNameSendto = 5
+ SocketFunctionNameSendMsg = 6
+ SocketFunctionNameSendMMSg = 7
+ SocketFunctionNameSendFile = 8
+ SocketFunctionNameWrite = 9
+ SocketFunctionNameWritev = 10
+ SocketFunctionNameRead = 11
+ SocketFunctionNameReadv = 12
+ SocketFunctionNameRecv = 13
+ SocketFunctionNameRecvfrom = 14
+ SocketFunctionNameRecvMsg = 15
+ SocketFunctionNameRecvMMsg = 16
+ SocketFunctionNameResent = 17
+ SocketFunctionNameSslWrite = 18
+ SocketFunctionNameSslRead = 19
+ SocketFunctionNameGoTLSWrite = 20
+ SocketFunctionNameGoTLSRead = 21
+)
+
+// nolint
+func (f SocketFunctionName) String() string {
+ switch f {
+ case SocketFunctionNameConnect:
+ return "Connect"
+ case SocketFunctionNameAccept:
+ return "Accept"
+ case SocketFunctionNameClose:
+ return "Close"
+ case SocketFunctionNameSend:
+ return "Send"
+ case SocketFunctionNameSendto:
+ return "Sendto"
+ case SocketFunctionNameSendMsg:
+ return "SendMsg"
+ case SocketFunctionNameSendMMSg:
+ return "SendMMSg"
+ case SocketFunctionNameSendFile:
+ return "SendFile"
+ case SocketFunctionNameWrite:
+ return "Write"
+ case SocketFunctionNameWritev:
+ return "Writev"
+ case SocketFunctionNameRead:
+ return "Read"
+ case SocketFunctionNameReadv:
+ return "Readv"
+ case SocketFunctionNameRecv:
+ return "Recv"
+ case SocketFunctionNameRecvfrom:
+ return "Recvfrom"
+ case SocketFunctionNameRecvMsg:
+ return "RecvMsg"
+ case SocketFunctionNameRecvMMsg:
+ return "RecvMMsg"
+ case SocketFunctionNameResent:
+ return "Resent"
+ case SocketFunctionNameSslWrite:
+ return "SslWrite"
+ case SocketFunctionNameSslRead:
+ return "SslRead"
+ case SocketFunctionNameGoTLSWrite:
+ return "GoTLSWrite"
+ case SocketFunctionNameGoTLSRead:
+ return "GoTLSRead"
+ default:
+ return fmt.Sprintf("Unknown(%d)", f)
+ }
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go
b/pkg/profiling/task/network/analyze/layer7/events.go
index 5590dce..68b6520 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -33,12 +33,21 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize
int, config *profili
}
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader)
{
- l.socketDataQueue.Start(ctx, bpfLoader,
bpfLoader.SocketDataUploadEventQueue, 1, l.protocolPerCPUBuffer,
- func() interface{} {
- return &base.SocketDataUploadEvent{}
- }, func(data interface{}) string {
- return
data.(*base.SocketDataUploadEvent).GenerateConnectionID()
- })
+ // socket buffer data
+
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue,
l.protocolPerCPUBuffer, func() interface{} {
+ return &base.SocketDataUploadEvent{}
+ }, func(data interface{}) string {
+ return data.(*base.SocketDataUploadEvent).GenerateConnectionID()
+ })
+
+ // socket detail
+ l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue,
l.protocolPerCPUBuffer, func() interface{} {
+ return &base.SocketDetailEvent{}
+ }, func(data interface{}) string {
+ return data.(*base.SocketDetailEvent).GenerateConnectionID()
+ })
+
+ l.socketDataQueue.Start(ctx, bpfLoader)
}
func (l *Listener) handleProfilingExtensionConfig(config
*profiling.ExtensionConfig) {
@@ -66,6 +75,10 @@ func (p *SocketDataPartitionContext) Start(ctx
context.Context) {
}
func (p *SocketDataPartitionContext) Consume(data interface{}) {
- event := data.(*base.SocketDataUploadEvent)
- p.analyzer.ReceiveSocketDataEvent(event)
+ switch v := data.(type) {
+ case *base.SocketDetailEvent:
+ p.analyzer.ReceiveSocketDetail(v)
+ case *base.SocketDataUploadEvent:
+ p.analyzer.ReceiveSocketDataEvent(v)
+ }
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 0abc173..9449116 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -87,22 +87,25 @@ func (a *ProtocolAnalyzer) Start(ctx context.Context) {
}
}
+func (a *ProtocolAnalyzer) ReceiveSocketDetail(ctx Context, event
*SocketDetailEvent) {
+ connectionID := event.GenerateConnectionID()
+ connection := a.getConnection(ctx, event.ConnectionID, event.RandomID)
+
+ log.Debugf("receive detail from connection: %s, dataid: %d",
connectionID, event.DataID)
+ connection.buffer.appendDetailEvent(event)
+}
+
func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event
*SocketDataUploadEvent) {
connectionID := event.GenerateConnectionID()
- key := connectionKey{connectionID: event.ConnectionID, randomID:
event.RandomID}
- connection := a.connections[key]
- if connection == nil {
- connection = newConnectionInfo(a.protocol, ctx,
key.connectionID, key.randomID)
- a.connections[key] = connection
- }
- connection.checkConnectionMetrics(ctx)
+ connection := a.getConnection(ctx, event.ConnectionID, event.RandomID)
- log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished:
%d, have reduce after chunk: %t, direction: %s, size: %d, total size: %d",
+ log.Debugf("receive data from connection: %s, dataid: %d, sequence: %d,
finished: %d, have reduce after chunk: %t, "+
+ "direction: %s, size: %d, total size: %d",
connectionID, event.DataID(), event.DataSequence(),
event.Finished, event.HaveReduceDataAfterChunk(),
event.Direction().String(), event.DataLen, event.TotalSize0)
// insert to the event list
- connection.buffer.appendEvent(event)
+ connection.buffer.appendDataEvent(event)
// process the events if reach the receiver counter
a.receiveEventCount++
@@ -112,6 +115,17 @@ func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context,
event *SocketDataUploa
a.receiveEventCount = 0
}
+func (a *ProtocolAnalyzer) getConnection(ctx Context, connectionID, randomID
uint64) *connectionInfo {
+ key := connectionKey{connectionID: connectionID, randomID: randomID}
+ connection := a.connections[key]
+ if connection == nil {
+ connection = newConnectionInfo(a.protocol, ctx,
key.connectionID, key.randomID)
+ a.connections[key] = connection
+ }
+ connection.checkConnectionMetrics(ctx)
+ return connection
+}
+
// processEvents means analyze the protocol in each connection
func (a *ProtocolAnalyzer) processEvents() {
// it could be triggered by interval or reach counter
@@ -147,7 +161,7 @@ func (a *ProtocolAnalyzer)
processConnectionEvents(connection *connectionInfo) {
for {
// reset the status of reading
if !buffer.prepareForReading() {
- log.Debugf("prepare finsihed: event size: %d",
buffer.events.Len())
+ log.Debugf("prepare finsihed: event size: %d",
buffer.dataEvents.Len())
return
}
@@ -161,7 +175,7 @@ func (a *ProtocolAnalyzer)
processConnectionEvents(connection *connectionInfo) {
}
if finishReading {
- log.Debugf("reading finsihed: event size: %d",
buffer.events.Len())
+ log.Debugf("reading finsihed: event size: %d",
buffer.dataEvents.Len())
break
}
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
index c7b2647..bbcac23 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -33,8 +33,9 @@ var (
)
type Buffer struct {
- events *list.List
- validated bool // the events list is validated or not
+ dataEvents *list.List
+ detailEvents *list.List
+ validated bool // the events list is validated or not
eventLocker sync.RWMutex
@@ -57,27 +58,69 @@ func (p *BufferPosition) String() string {
func newBuffer() *Buffer {
return &Buffer{
- events: list.New(),
- validated: false,
+ dataEvents: list.New(),
+ detailEvents: list.New(),
+ validated: false,
}
}
+func (r *Buffer) FindFirstDataBuffer(dataID uint64) SocketDataBuffer {
+ for e := r.dataEvents.Front(); e != nil; e = e.Next() {
+ cur := e.Value.(SocketDataBuffer)
+ if cur.DataID() == dataID {
+ return cur
+ }
+ }
+ return nil
+}
+
func (r *Buffer) Position() *BufferPosition {
return r.current.Clone()
}
func (r *Buffer) Slice(validated bool, start, end *BufferPosition) *Buffer {
- events := list.New()
+ dataEvents := list.New()
+ detailEvents := list.New()
+ var firstDetailElement *list.Element
for nextElement := start.element; nextElement != end.element;
nextElement = nextElement.Next() {
- events.PushBack(nextElement.Value)
+ // found first matches detail event
+ if detailEvents.Len() == 0 || firstDetailElement == nil {
+ for e := r.detailEvents.Front(); e != nil; e = e.Next()
{
+ if e.Value.(*SocketDetailEvent).DataID >=
nextElement.Value.(SocketDataBuffer).DataID() {
+ detailEvents.PushBack(e.Value)
+ firstDetailElement = e
+ break
+ }
+ }
+ }
+ dataEvents.PushBack(nextElement.Value)
+ }
+ lastBuffer := end.element.Value.(SocketDataBuffer)
+ dataEvents.PushBack(&SocketDataEventLimited{lastBuffer, 0,
end.bufIndex})
+
+ // if the first detail element been found, append the details until the
last buffer data id
+ if firstDetailElement == nil {
+ for e := r.detailEvents.Front(); e != nil; e = e.Next() {
+ if e.Value.(*SocketDetailEvent).DataID ==
lastBuffer.DataID() {
+ detailEvents.PushBack(e.Value)
+ break
+ }
+ }
+ } else if firstDetailElement != nil &&
firstDetailElement.Value.(*SocketDetailEvent).DataID != lastBuffer.DataID() {
+ for tmp := firstDetailElement.Next(); tmp != nil; tmp =
tmp.Next() {
+ if tmp.Value.(*SocketDetailEvent).DataID >
lastBuffer.DataID() {
+ break
+ }
+ detailEvents.PushBack(tmp.Value)
+ }
}
-
events.PushBack(&SocketDataEventLimited{end.element.Value.(SocketDataBuffer),
0, end.bufIndex})
return &Buffer{
- events: events,
- validated: validated,
- head: &BufferPosition{element: events.Front(), bufIndex:
start.bufIndex},
- current: &BufferPosition{element: events.Front(), bufIndex:
start.bufIndex},
+ dataEvents: dataEvents,
+ detailEvents: detailEvents,
+ validated: validated,
+ head: &BufferPosition{element: dataEvents.Front(),
bufIndex: start.bufIndex},
+ current: &BufferPosition{element: dataEvents.Front(),
bufIndex: start.bufIndex},
}
}
@@ -94,27 +137,31 @@ func (r *Buffer) Len() int {
return result
}
+func (r *Buffer) Details() *list.List {
+ return r.detailEvents
+}
+
func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
- if r.events.Len() == 0 {
+ if r.dataEvents.Len() == 0 {
return nil
}
- return r.events.Front().Value.(SocketDataBuffer)
+ return r.dataEvents.Front().Value.(SocketDataBuffer)
}
func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
- if r.events.Len() == 0 {
+ if r.dataEvents.Len() == 0 {
return nil
}
- return r.events.Back().Value.(SocketDataBuffer)
+ return r.dataEvents.Back().Value.(SocketDataBuffer)
}
// DetectNotSendingLastPosition detect the buffer contains not sending data:
the BPF limited socket data count
func (r *Buffer) DetectNotSendingLastPosition() *BufferPosition {
- if r.events.Len() == 0 {
+ if r.dataEvents.Len() == 0 {
return nil
}
- for e := r.events.Front(); e != nil; e = e.Next() {
+ for e := r.dataEvents.Front(); e != nil; e = e.Next() {
buf := e.Value.(SocketDataBuffer)
// the buffer is sent finished but still have reduced data not
send
if buf.IsFinished() && buf.HaveReduceDataAfterChunk() {
@@ -131,23 +178,27 @@ func CombineSlices(validated bool, buffers ...*Buffer)
*Buffer {
if len(buffers) == 1 {
return buffers[0]
}
- events := list.New()
+ dataEvents := list.New()
+ detailEvents := list.New()
for _, b := range buffers {
if b.head.bufIndex > 0 {
- headBuffer := b.events.Front().Value.(SocketDataBuffer)
- events.PushBack(&SocketDataEventLimited{headBuffer,
b.head.bufIndex, headBuffer.BufferLen()})
- for next := b.events.Front().Next(); next != nil; next
= next.Next() {
- events.PushBack(next.Value)
+ headBuffer :=
b.dataEvents.Front().Value.(SocketDataBuffer)
+ dataEvents.PushBack(&SocketDataEventLimited{headBuffer,
b.head.bufIndex, headBuffer.BufferLen()})
+ for next := b.dataEvents.Front().Next(); next != nil;
next = next.Next() {
+ dataEvents.PushBack(next.Value)
}
} else {
- events.PushBackList(b.events)
+ dataEvents.PushBackList(b.dataEvents)
}
+ detailEvents.PushBackList(b.detailEvents)
}
+
return &Buffer{
- events: events,
- validated: validated,
- head: &BufferPosition{element: events.Front(), bufIndex:
0},
- current: &BufferPosition{element: events.Front(), bufIndex:
0},
+ dataEvents: dataEvents,
+ detailEvents: detailEvents,
+ validated: validated,
+ head: &BufferPosition{element: dataEvents.Front(),
bufIndex: 0},
+ current: &BufferPosition{element: dataEvents.Front(),
bufIndex: 0},
}
}
@@ -292,14 +343,14 @@ func (r *Buffer) resetForLoopReading() {
}
func (r *Buffer) prepareForReading() bool {
- if r.events.Len() == 0 {
+ if r.dataEvents.Len() == 0 {
return false
}
if r.head == nil || r.head.element == nil {
// read in the first element
r.eventLocker.RLock()
defer r.eventLocker.RUnlock()
- r.head = &BufferPosition{element: r.events.Front(), bufIndex: 0}
+ r.head = &BufferPosition{element: r.dataEvents.Front(),
bufIndex: 0}
r.current = r.head.Clone()
} else {
// make sure we can read from head
@@ -313,6 +364,30 @@ func (r *Buffer) removeReadElements() bool {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
+ // delete until the last data id
+ if r.head.element != nil && r.current.element != nil {
+ firstDataID := r.head.element.Value.(SocketDataBuffer).DataID()
+ lastDataID :=
r.current.element.Value.(SocketDataBuffer).DataID()
+ startDelete := false
+ for e := r.detailEvents.Front(); e != nil; {
+ event := e.Value.(*SocketDetailEvent)
+ if !startDelete && event.DataID >= firstDataID &&
event.DataID <= lastDataID {
+ startDelete = true
+ } else if startDelete && event.DataID > lastDataID {
+ // out of the data id, just break
+ break
+ }
+
+ if startDelete {
+ tmp := e.Next()
+ r.detailEvents.Remove(e)
+ e = tmp
+ } else {
+ e = e.Next()
+ }
+ }
+ }
+
// delete until to current position
next := r.head.element
for ; next != nil && next != r.current.element; next =
r.removeElement0(next) {
@@ -343,25 +418,54 @@ func (r *Buffer) removeElement0(element *list.Element)
*list.Element {
return nil
}
result := element.Next()
- r.events.Remove(element)
+ r.dataEvents.Remove(element)
return result
}
-// appendEvent insert the event to the event list following the order
-func (r *Buffer) appendEvent(event *SocketDataUploadEvent) {
+func (r *Buffer) appendDetailEvent(event *SocketDetailEvent) {
+ r.eventLocker.Lock()
+ defer r.eventLocker.Unlock()
+
+ if r.detailEvents.Len() == 0 {
+ r.detailEvents.PushFront(event)
+ return
+ }
+ if r.detailEvents.Back().Value.(*SocketDetailEvent).DataID <
event.DataID {
+ r.detailEvents.PushBack(event)
+ return
+ }
+ beenAdded := false
+ for element := r.detailEvents.Front(); element != nil; element =
element.Next() {
+ existEvent := element.Value.(*SocketDetailEvent)
+ if existEvent.DataID > event.DataID {
+ // data id needs order
+ beenAdded = true
+ }
+ if beenAdded {
+ r.detailEvents.InsertBefore(event, element)
+ break
+ }
+ }
+ if !beenAdded {
+ r.detailEvents.PushBack(event)
+ }
+}
+
+// appendDataEvent insert the event to the event list following the order
+func (r *Buffer) appendDataEvent(event *SocketDataUploadEvent) {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
- if r.events.Len() == 0 {
- r.events.PushFront(event)
+ if r.dataEvents.Len() == 0 {
+ r.dataEvents.PushFront(event)
return
}
- if r.events.Back().Value.(SocketDataBuffer).DataID() < event.DataID() {
- r.events.PushBack(event)
+ if r.dataEvents.Back().Value.(SocketDataBuffer).DataID() <
event.DataID() {
+ r.dataEvents.PushBack(event)
return
}
beenAdded := false
- for element := r.events.Front(); element != nil; element =
element.Next() {
+ for element := r.dataEvents.Front(); element != nil; element =
element.Next() {
existEvent := element.Value.(SocketDataBuffer)
if existEvent.DataID() > event.DataID() {
// data id needs order
@@ -371,12 +475,12 @@ func (r *Buffer) appendEvent(event
*SocketDataUploadEvent) {
beenAdded = true
}
if beenAdded {
- r.events.InsertBefore(event, element)
+ r.dataEvents.InsertBefore(event, element)
break
}
}
if !beenAdded {
- r.events.PushBack(event)
+ r.dataEvents.PushBack(event)
}
}
@@ -386,13 +490,13 @@ func (r *Buffer) deleteExpireEvents(expireDuration
time.Duration) int {
expireTime := time.Now().Add(-expireDuration)
count := 0
- for e := r.events.Front(); e != nil; {
+ for e := r.dataEvents.Front(); e != nil; {
startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
if expireTime.After(startTime) {
count++
cur := e
e = e.Next()
- r.events.Remove(cur)
+ r.dataEvents.Remove(cur)
} else {
break
}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
index 87984b2..6595cbc 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
@@ -67,7 +67,7 @@ func TestOffsetPosition(t *testing.T) {
for _, test := range tests {
events := list.New()
- buffer := Buffer{events: events}
+ buffer := Buffer{dataEvents: events}
var curElement *list.Element
for i, e := range test.events {
element := events.PushBack(&SocketDataUploadEvent{
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index 52f886d..debf48c 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -136,3 +136,20 @@ func (s *SocketDataEventLimited) BufferLen() int {
func (s *SocketDataEventLimited) BufferStartPosition() int {
return s.from
}
+
+type SocketDetailEvent struct {
+ ConnectionID uint64
+ RandomID uint64
+ DataID uint64
+ TotalPackageSize uint64
+ IfIndex uint32
+ PackageCount uint8
+ FuncName base.SocketFunctionName
+ RTTCount uint8
+ Protocol base.ConnectionProtocol
+ RTTTime uint64
+}
+
+func (s *SocketDetailEvent) GenerateConnectionID() string {
+ return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
+}
diff --git
a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 9290d34..27e72b3 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -223,7 +223,7 @@ func (h *Trace) Flush(duration int64, process
api.ProcessInterface, traffic *bas
metricsBuilder.AppendLogs(process.Entity().ServiceName, logData)
- // append full http content
+ // append full http content and syscall
h.AppendHTTPEvents(process, traffic, metricsBuilder)
}
@@ -232,10 +232,12 @@ func (h *Trace) AppendHTTPEvents(process
api.ProcessInterface, traffic *base.Pro
if h.Settings != nil && h.Settings.RequireCompleteRequest {
events = h.appendHTTPEvent(events, process, traffic,
transportRequest, h.Request.MessageOpt, h.TaskConfig.DefaultRequestEncoding,
h.Settings.MaxRequestSize)
+ events = h.appendSyscallEvents(events, process, traffic,
h.Request.MessageOpt)
}
if h.Settings != nil && h.Settings.RequireCompleteResponse {
events = h.appendHTTPEvent(events, process, traffic,
transportResponse, h.Response.MessageOpt, h.TaskConfig.DefaultResponseEncoding,
h.Settings.MaxResponseSize)
+ events = h.appendSyscallEvents(events, process, traffic,
h.Response.MessageOpt)
}
metricsBuilder.AppendSpanAttachedEvents(events)
@@ -278,6 +280,64 @@ func (h *Trace) appendHTTPEvent(events
[]*v3.SpanAttachedEvent, process api.Proc
return append(events, event)
}
+func (h *Trace) appendSyscallEvents(events []*v3.SpanAttachedEvent, process
api.ProcessInterface, traffic *base.ProcessTraffic,
+ message *reader.MessageOpt) []*v3.SpanAttachedEvent {
+ headerDetails := message.HeaderBuffer().Details()
+ bodyDetails := message.BodyBuffer().Details()
+ dataIDCache := make(map[uint64]bool)
+ for e := headerDetails.Front(); e != nil; e = e.Next() {
+ event := e.Value.(*protocol.SocketDetailEvent)
+ dataIDCache[event.DataID] = true
+ events = h.appendPerDetailEvent(events, process, traffic,
event, message.HeaderBuffer())
+ }
+ for e := bodyDetails.Front(); e != nil; e = e.Next() {
+ event := e.Value.(*protocol.SocketDetailEvent)
+ if dataIDCache[event.DataID] {
+ continue
+ }
+ events = h.appendPerDetailEvent(events, process, traffic,
event, message.BodyBuffer())
+ }
+ return events
+}
+
+func (h *Trace) appendPerDetailEvent(events []*v3.SpanAttachedEvent, process
api.ProcessInterface, _ *base.ProcessTraffic,
+ detail *protocol.SocketDetailEvent, buffer *protocol.Buffer)
[]*v3.SpanAttachedEvent {
+ event := &v3.SpanAttachedEvent{}
+ dataBuffer := buffer.FindFirstDataBuffer(detail.DataID)
+ if dataBuffer == nil {
+ return events
+ }
+ event.StartTime = host.TimeToInstant(dataBuffer.StartTime())
+ event.EndTime = host.TimeToInstant(dataBuffer.EndTime())
+ event.Event = fmt.Sprintf("Syscall %s", detail.FuncName.String())
+ event.Tags = make([]*commonv3.KeyStringValuePair, 0)
+ event.Tags = append(event.Tags,
+ // content data
+ &commonv3.KeyStringValuePair{Key: "package_size", Value:
units.BytesSize(float64(detail.TotalPackageSize))},
+ &commonv3.KeyStringValuePair{Key: "package_count", Value:
fmt.Sprintf("%d", detail.PackageCount)},
+ &commonv3.KeyStringValuePair{Key: "network_name", Value:
host.NetworkName(int(detail.IfIndex))},
+ &commonv3.KeyStringValuePair{Key: "network_index", Value:
fmt.Sprintf("%d", detail.IfIndex)},
+ // entity
+ &commonv3.KeyStringValuePair{Key: "service_name", Value:
process.Entity().ServiceName},
+ &commonv3.KeyStringValuePair{Key: "service_instance_name",
Value: process.Entity().InstanceName},
+ &commonv3.KeyStringValuePair{Key: "process_name", Value:
process.Entity().ProcessName},
+ )
+
+ if detail.RTTTime > 0 {
+ event.Tags = append(event.Tags,
+ &commonv3.KeyStringValuePair{Key: "avg_rtt_time",
Value: fmt.Sprintf("%dns", int(detail.RTTTime)/int(detail.RTTCount))})
+ }
+
+ event.Summary = make([]*commonv3.KeyIntValuePair, 0)
+ event.TraceContext = &v3.SpanAttachedEvent_SpanReference{
+ TraceId: h.Trace.TraceID(),
+ TraceSegmentId: h.Trace.TraceSegmentID(),
+ SpanId: h.Trace.SpanID(),
+ Type: h.Trace.Provider().Type,
+ }
+ return append(events, event)
+}
+
type SamplingTraceLogBody struct {
URI string `json:"uri"`
Reason string `json:"reason"`
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index d3db212..ad1859c 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -78,6 +78,16 @@ func (a *Analyzer) ReceiveSocketDataEvent(event
*protocol.SocketDataUploadEvent)
analyzer.ReceiveSocketData(a.ctx, event)
}
+func (a *Analyzer) ReceiveSocketDetail(event *protocol.SocketDetailEvent) {
+ analyzer := a.protocols[event.Protocol]
+ if analyzer == nil {
+ log.Warnf("could not found any protocol to handle socket
detail, connection id: %s, protocol: %s(%d)",
+ event.GenerateConnectionID(), event.Protocol.String(),
event.Protocol)
+ return
+ }
+ analyzer.ReceiveSocketDetail(a.ctx, event)
+}
+
func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
for _, p := range a.protocols {
p.UpdateExtensionConfig(config)
diff --git a/pkg/profiling/task/network/analyze/layer7/queue.go
b/pkg/profiling/task/network/analyze/layer7/queue.go
index 4c6ffa9..a249d68 100644
--- a/pkg/profiling/task/network/analyze/layer7/queue.go
+++ b/pkg/profiling/task/network/analyze/layer7/queue.go
@@ -34,11 +34,19 @@ type PartitionContext interface {
type EventQueue struct {
count int
+ receivers []*mapReceiver
partitions []*partition
startOnce sync.Once
}
+type mapReceiver struct {
+ emap *ebpf.Map
+ perCPUBuffer int
+ dataSupplier func() interface{}
+ router func(data interface{}) string
+}
+
func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator
func() PartitionContext) *EventQueue {
partitions := make([]*partition, 0)
for i := 0; i < partitionCount; i++ {
@@ -47,10 +55,19 @@ func NewEventQueue(partitionCount, sizePerPartition int,
contextGenerator func()
return &EventQueue{count: partitionCount, partitions: partitions}
}
-func (e *EventQueue) Start(ctx context.Context, bpfLoader *bpf.Loader, emap
*ebpf.Map, receiverCount int,
- perCPUBufferSize int, dataSupplier func() interface{}, routeGenerator
func(data interface{}) string) {
+func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize int,
dataSupplier func() interface{},
+ routeGenerator func(data interface{}) string) {
+ e.receivers = append(e.receivers, &mapReceiver{
+ emap: emap,
+ perCPUBuffer: perCPUBufferSize,
+ dataSupplier: dataSupplier,
+ router: routeGenerator,
+ })
+}
+
+func (e *EventQueue) Start(ctx context.Context, bpfLoader *bpf.Loader) {
e.startOnce.Do(func() {
- e.start0(ctx, bpfLoader, emap, receiverCount, perCPUBufferSize,
dataSupplier, routeGenerator)
+ e.start0(ctx, bpfLoader)
})
}
@@ -64,12 +81,13 @@ func (e *EventQueue) Push(key string, data interface{}) {
e.partitions[sum32%e.count].channel <- data
}
-func (e *EventQueue) start0(ctx context.Context, bpfLoader *bpf.Loader, emap
*ebpf.Map, receiverCount int,
- perCPUBufferSize int, dataSupplier func() interface{}, routeGenerator
func(data interface{}) string) {
- for i := 0; i < receiverCount; i++ {
- bpfLoader.ReadEventAsyncWithBufferSize(emap, func(data
interface{}) {
- e.routerTransformer(data, routeGenerator)
- }, perCPUBufferSize, dataSupplier)
+func (e *EventQueue) start0(ctx context.Context, bpfLoader *bpf.Loader) {
+ for _, r := range e.receivers {
+ func(receiver *mapReceiver) {
+ bpfLoader.ReadEventAsyncWithBufferSize(receiver.emap,
func(data interface{}) {
+ e.routerTransformer(data, receiver.router)
+ }, receiver.perCPUBuffer, receiver.dataSupplier)
+ }(r)
}
for i := 0; i < len(e.partitions); i++ {
diff --git a/pkg/profiling/task/network/runner.go
b/pkg/profiling/task/network/runner.go
index 8c41eb3..95c1ef7 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -173,6 +173,9 @@ func (r *Runner) Start(ctx context.Context, task
*base.ProfilingTask, processes
log.Warnf("cannot monitor the tcp drop, ignore it and keep
profiling: %v", e)
}
+ bpfLoader.AddLink(link.Kprobe,
map[string]*ebpf.Program{"ip_finish_output": bpfLoader.IpFinishOutput})
+ bpfLoader.AddLink(link.Kprobe,
map[string]*ebpf.Program{"skb_copy_datagram_iter":
bpfLoader.SkbCopyDatagramIter})
+
if err := bpfLoader.HasError(); err != nil {
_ = bpfLoader.Close()
return err
diff --git a/pkg/tools/host/network.go b/pkg/tools/host/network.go
new file mode 100644
index 0000000..34f24de
--- /dev/null
+++ b/pkg/tools/host/network.go
@@ -0,0 +1,62 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package host
+
+import (
+ "net"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+)
+
+var interfaces map[int]*net.Interface
+var log = logger.GetLogger("tools", "host")
+
+func init() {
+ if err := refreshInterfaces(); err != nil {
+ panic(err)
+ }
+}
+
+func NetworkName(ifindex int) string {
+ i := interfaces[ifindex]
+ if i != nil {
+ return i.Name
+ }
+ if e := refreshInterfaces(); e != nil {
+ log.Warnf("error to refresh interfaces: %v", e)
+ return ""
+ }
+ i = interfaces[ifindex]
+ if i != nil {
+ return i.Name
+ }
+ return ""
+}
+
+func refreshInterfaces() error {
+ results, err := net.Interfaces()
+ if err != nil {
+ return err
+ }
+ tmp := make(map[int]*net.Interface)
+ for _, interf := range results {
+ tmp[interf.Index] = &interf
+ }
+ interfaces = tmp
+ return nil
+}