This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new b667e31 Enhance the network profiling (#40)
b667e31 is described below
commit b667e31f74fcadec93dfd8821afd3f1e01e27d64
Author: mrproliu <[email protected]>
AuthorDate: Sun Jul 24 17:24:44 2022 +0800
Enhance the network profiling (#40)
---
bpf/profiling/network/args.h | 16 ++-
bpf/profiling/network/netmonitor.c | 215 ++++++++++++++++------------
bpf/profiling/network/protocol_analyze.h | 27 ++--
bpf/profiling/network/sock_stats.h | 24 ++++
go.mod | 1 +
go.sum | 2 +
pkg/process/api/process.go | 5 +
pkg/process/finders/base/process.go | 2 +
pkg/process/finders/base/template.go | 12 ++
pkg/process/finders/context.go | 11 +-
pkg/process/finders/kubernetes/process.go | 22 +++
pkg/process/finders/scanner/process.go | 14 ++
pkg/process/finders/storage.go | 5 +
pkg/profiling/task/network/analyzer.go | 193 ++++++++++++++++++++++----
pkg/profiling/task/network/context.go | 223 +++++++++++++++++-------------
pkg/profiling/task/network/delegate.go | 6 +-
pkg/profiling/task/network/linker.go | 40 +++++-
pkg/profiling/task/network/metrics.go | 4 +-
pkg/profiling/task/network/runner.go | 51 +++----
pkg/profiling/task/network/tcpresolver.go | 4 +-
20 files changed, 615 insertions(+), 262 deletions(-)
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index fed379a..ce65720 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -19,7 +19,7 @@
#pragma once
-#define MAX_SOCKET_BUFFER_READ_LENGTH 1024
+#define MAX_SOCKET_BUFFER_READ_LENGTH 4095
// unknown the connection type, not trigger the syscall connect,accept
#define AF_UNKNOWN 0xff
@@ -41,6 +41,7 @@
#define SOCKET_OPTS_TYPE_RECVFROM 14
#define SOCKET_OPTS_TYPE_RECVMSG 15
#define SOCKET_OPTS_TYPE_RECVMMSG 16
+#define SOCKET_OPTS_TYPE_RESENT 17
// tracepoint enter
struct trace_event_raw_sys_enter {
@@ -106,7 +107,7 @@ struct sock_data_args_t {
// current read/write is calls on the sockets.
__u32 is_sock_event;
size_t iovlen;
- struct mmsghdr *mmsg;
+ unsigned int* msg_len;
__u64 start_nacs;
// rtt
__u64 rtt_count;
@@ -147,13 +148,18 @@ struct {
__uint(max_entries, 1);
} socket_buffer_reader_map SEC(".maps");
static __inline struct socket_buffer_reader_t* read_socket_data(void *buf,
__u32 data_bytes) {
+ __u64 size;
__u32 kZero = 0;
struct socket_buffer_reader_t* reader =
bpf_map_lookup_elem(&socket_buffer_reader_map, &kZero);
if (reader == NULL) {
return NULL;
}
- __u32 max_len = data_bytes < MAX_SOCKET_BUFFER_READ_LENGTH ? (data_bytes &
MAX_SOCKET_BUFFER_READ_LENGTH - 1) : MAX_SOCKET_BUFFER_READ_LENGTH;
- reader->data_len = bpf_probe_read_str(&reader->buffer, max_len, buf);
-// reader->data_len = max_len;
+ size = data_bytes;
+ if (size > MAX_SOCKET_BUFFER_READ_LENGTH) {
+ size = MAX_SOCKET_BUFFER_READ_LENGTH;
+ }
+ asm volatile("%[size] &= 0xfff;\n" ::[size] "+r"(size) :);
+ bpf_probe_read(&reader->buffer, size, buf);
+ reader->data_len = size;
return reader;
}
\ No newline at end of file
diff --git a/bpf/profiling/network/netmonitor.c
b/bpf/profiling/network/netmonitor.c
index 1628a3a..7296606 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -81,7 +81,10 @@ static __always_inline void submit_new_connection(struct
pt_regs* ctx, __u32 fun
} else {
con.role = CONNECTION_ROLE_TYPE_UNKNOWN;
}
+ __u16 port;
+ __u32 need_complete_addr = 1;
if (socket != NULL) {
+ need_complete_addr = 0;
// only get from accept function(server side)
struct sock* s;
BPF_CORE_READ_INTO(&s, socket, sk);
@@ -89,32 +92,55 @@ static __always_inline void submit_new_connection(struct
pt_regs* ctx, __u32 fun
short unsigned int skc_family;
BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
con.socket_family = skc_family;
+
+ if (con.socket_family == AF_INET) {
+ BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
+ con.local_port = port;
+ BPF_CORE_READ_INTO(&con.local_addr_v4, s,
__sk_common.skc_rcv_saddr);
+ BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
+ con.remote_port = bpf_ntohs(port);
+ BPF_CORE_READ_INTO(&con.remote_addr_v4, s, __sk_common.skc_daddr);
+ } else if (con.socket_family == AF_INET6) {
+ BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
+ con.local_port = port;
+ BPF_CORE_READ_INTO(&con.local_addr_v6, s,
__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr8);
+ BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
+ con.remote_port = bpf_ntohs(port);
+ BPF_CORE_READ_INTO(&con.remote_addr_v6, s,
__sk_common.skc_v6_daddr.in6_u.u6_addr8);
+ }
} else if (addr != NULL) {
con.socket_family = _(addr->sa_family);
+ if (con.socket_family == AF_INET) {
+ struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
+ bpf_probe_read(&con.remote_addr_v4, sizeof(con.remote_addr_v4),
&daddr->sin_addr.s_addr);
+ bpf_probe_read(&port, sizeof(port), &daddr->sin_port);
+ con.remote_port = bpf_ntohs(port);
+ } else if (con.socket_family == AF_INET6) {
+ struct sockaddr_in6 *daddr = (struct sockaddr_in6 *)addr;
+ bpf_probe_read(&con.remote_addr_v6, sizeof(con.remote_addr_v6),
&daddr->sin6_addr.s6_addr);
+ bpf_probe_read(&port, sizeof(port), &daddr->sin6_port);
+ con.remote_port = bpf_ntohs(port);
+ }
} else {
con.socket_family = AF_UNKNOWN;
}
// save to the active connection map
__u64 conid = gen_tgid_fd(tgid, fd);
- bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
-
+ struct socket_connect_event_t *event = create_socket_connect_event();
// only trace ipv4, v6, or unknown
- if (family_should_trace(con.socket_family) == false) {
- return;
- }
-
- // pid is contains
- if (tgid_should_trace(tgid) == false) {
+ // pid is not contains(monitored)
+ // cannot create connect event object
+ if (family_should_trace(con.socket_family) == false ||
tgid_should_trace(tgid) == false || !event) {
+ con.connect_event_send = false;
+ bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
return;
}
+ // default setting as sent
+ con.connect_event_send = true;
+ bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
// send to user-space that have connection activated
- struct socket_connect_event_t *event = create_socket_connect_event();
- if (!event) {
-// bpf_printk("cannot create the socket connect event");
- return;
- }
event->conid = conid;
event->random_id = con.random_id;
event->func_name = func_name;
@@ -127,63 +153,36 @@ static __always_inline void submit_new_connection(struct
pt_regs* ctx, __u32 fun
// fill the connection
event->role = con.role;
event->socket_family = con.socket_family;
- __u16 port;
- if (socket != NULL) {
- event->need_complete_addr = 0;
- // only get from accept function(server side)
- struct sock* s;
- BPF_CORE_READ_INTO(&s, socket, sk);
-
- if (con.socket_family == AF_INET) {
- BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
- event->local_port = port;
- BPF_CORE_READ_INTO(&event->local_addr_v4, s,
__sk_common.skc_rcv_saddr);
- BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
- event->remote_port = bpf_ntohs(port);
- BPF_CORE_READ_INTO(&event->remote_addr_v4, s,
__sk_common.skc_daddr);
- } else if (con.socket_family == AF_INET6) {
- BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
- event->local_port = port;
- BPF_CORE_READ_INTO(&event->local_addr_v6, s,
__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr8);
- BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
- event->remote_port = bpf_ntohs(port);
- BPF_CORE_READ_INTO(&event->remote_addr_v6, s,
__sk_common.skc_v6_daddr.in6_u.u6_addr8);
- } else {
- event->local_port = 0;
- event->remote_port = 0;
- }
- } else if (addr != NULL) {
- event->need_complete_addr = 1;
- if (con.socket_family == AF_INET) {
- struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
- bpf_probe_read(&event->remote_addr_v4,
sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
- bpf_probe_read(&port, sizeof(port), &daddr->sin_port);
- event->remote_port = bpf_ntohs(port);
- } else if (con.socket_family == AF_INET6) {
- struct sockaddr_in6 *daddr = (struct sockaddr_in6 *)addr;
- bpf_probe_read(&event->remote_addr_v6,
sizeof(event->remote_addr_v6), &daddr->sin6_addr.s6_addr);
- bpf_probe_read(&port, sizeof(port), &daddr->sin6_port);
- event->remote_port = bpf_ntohs(port);
- } else {
- event->remote_port = 0;
+ event->need_complete_addr = need_complete_addr;
+ event->local_addr_v4 = con.local_addr_v4;
+ __builtin_memcpy(&event->local_addr_v6, &con.local_addr_v4,
16*sizeof(__u8));
+ event->local_port = con.local_port;
+ event->remote_addr_v4 = con.remote_addr_v4;
+ __builtin_memcpy(&event->remote_addr_v6, &con.remote_addr_v6,
16*sizeof(__u8));
+ event->remote_port = con.remote_port;
+
+ __u32 ret = bpf_perf_event_output(ctx, &socket_connection_event_queue,
BPF_F_CURRENT_CPU, event, sizeof(*event));
+ // if not send event success, then update to the event not sent
+ if (ret < 0) {
+ struct active_connection_t *con =
bpf_map_lookup_elem(&active_connection_map, &conid);
+ if (con != NULL) {
+ con->connect_event_send = false;
+ bpf_map_update_elem(&active_connection_map, &conid, con, 0);
}
- } else {
- event->need_complete_addr = 1;
- // clean the cache in LRU(remote port is enough)
- event->remote_port = 0;
}
-
- bpf_perf_event_output(ctx, &socket_connection_event_queue,
BPF_F_CURRENT_CPU, event, sizeof(*event));
}
static __inline void notify_close_connection(struct pt_regs* ctx, __u64 conid,
struct active_connection_t* con, __u64 start_time, __u64 end_time) {
- // only trace ipv4, v6, or unknown
- if (family_should_trace(con->socket_family) == false) {
- return;
- }
- // pid is contains
- if (tgid_should_trace(con->pid) == false) {
- return;
+ // if the connect event not send, then check the pid or socket family
+ if (con->connect_event_send == false) {
+ // only trace ipv4, v6, or unknown
+ if (family_should_trace(con->socket_family) == false) {
+ return;
+ }
+ // ignore send close event if current process should not trace
+ if (tgid_should_trace(con->pid) == false) {
+ return;
+ }
}
__u64 exe_time = (__u64)(end_time - start_time);
@@ -196,6 +195,14 @@ static __inline void notify_close_connection(struct
pt_regs* ctx, __u64 conid, s
close_event.sockfd = con->sockfd;
close_event.role = con->role;
+ close_event.socket_family = con->socket_family;
+ close_event.local_addr_v4 = con->local_addr_v4;
+ __builtin_memcpy(&close_event.local_addr_v6, &con->local_addr_v4,
16*sizeof(__u8));
+ close_event.local_port = con->local_port;
+ close_event.remote_addr_v4 = con->remote_addr_v4;
+ __builtin_memcpy(&close_event.remote_addr_v6, &con->remote_addr_v6,
16*sizeof(__u8));
+ close_event.remote_port = con->remote_port;
+
close_event.write_bytes = con->write_bytes;
close_event.write_count = con->write_count;
close_event.write_exe_time = con->write_exe_time;
@@ -206,7 +213,6 @@ static __inline void notify_close_connection(struct
pt_regs* ctx, __u64 conid, s
close_event.write_rtt_time = con->write_rtt_time;
bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU,
&close_event, sizeof(close_event));
-// bpf_printk("submit new close: conid: %lld, write bytes: %lld, write
count: %lld\n", conid, con->write_bytes, con->write_count);
}
static __inline void submit_close_connection(struct pt_regs* ctx, __u32 tgid,
__u32 fd, __u64 start_nacs) {
@@ -214,7 +220,6 @@ static __inline void submit_close_connection(struct
pt_regs* ctx, __u32 tgid, __
__u64 conid = gen_tgid_fd(tgid, fd);
struct active_connection_t* con =
bpf_map_lookup_elem(&active_connection_map, &conid);
if (con == NULL) {
-// bpf_printk("connection id not exists: tgid: %d, fd: %d -> %lld",
tgid, fd, conid);
return;
}
notify_close_connection(ctx, conid, con, start_nacs, curr_nacs);
@@ -247,7 +252,28 @@ static __inline void
submit_connection_when_not_exists(struct pt_regs *ctx, __u6
submit_new_connection(ctx, func_name, tgid, connect_args->fd,
connect_args->start_nacs, connect_args->addr, NULL);
}
-static __always_inline void process_write_data(void *ctx, __u64 id, struct
sock_data_args_t *args, ssize_t bytes_count,
+static __always_inline void resent_connect_event(struct pt_regs *ctx, __u32
tgid, __u32 fd, __u64 conid, struct active_connection_t *con) {
+ struct socket_connect_event_t *event = create_socket_connect_event();
+ if (!event) {
+ return;
+ }
+ event->conid = conid;
+ event->random_id = con->random_id;
+ event->func_name = SOCKET_OPTS_TYPE_RESENT;
+ event->pid = tgid;
+ event->sockfd = fd;
+ event->role = con->role;
+ event->socket_family = con->socket_family;
+ event->need_complete_addr = 1;
+ event->remote_port = 0;
+ __u32 ret = bpf_perf_event_output(ctx, &socket_connection_event_queue,
BPF_F_CURRENT_CPU, event, sizeof(*event));
+ if (ret >= 0) {
+ con->connect_event_send = true;
+ bpf_map_update_elem(&active_connection_map, &conid, con, 0);
+ }
+}
+
+static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id,
struct sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs,
__u32 func_name) {
__u64 curr_nacs = bpf_ktime_get_ns();
__u32 tgid = (__u32)(id >> 32);
@@ -273,26 +299,35 @@ static __always_inline void process_write_data(void *ctx,
__u64 id, struct sock_
return;
}
- // pid is contains
- if (tgid_should_trace(tgid) == false) {
- return;
+ // if connect event is not sent
+ if (conn->connect_event_send == false) {
+ // if the connection should trace, double check
+ if (tgid_should_trace(tgid) == false) {
+ return;
+ }
+ // resent the connection event
+ resent_connect_event(ctx, tgid, args->fd, conid, conn);
}
// unknown connection role, then try to use procotol analyzer to analyze
request or response
if (conn->role == CONNECTION_ROLE_TYPE_UNKNOWN) {
-// bpf_printk("connection role is unknown, buf exists: %d, ioves
exists: %d, func: %d\n", args->buf != NULL ? 1 : 0, args->iovec != NULL ? 1 :
0, func_name);
struct socket_buffer_reader_t *buf_reader = NULL;
if (args->buf != NULL) {
buf_reader = read_socket_data(args->buf, bytes_count);
} else if (args->iovec != NULL) {
- struct iovec *iovec = _(args->iovec);
- char *buff = _(iovec->iov_base);
- buf_reader = read_socket_data(buff, bytes_count);
+ struct iovec iov;
+ int err = bpf_probe_read(&iov, sizeof(iov), args->iovec);
+ if (err >= 0) {
+ __u64 size = iov.iov_len;
+ if (size > bytes_count) {
+ size = bytes_count;
+ }
+ buf_reader = read_socket_data((char *)iov.iov_base, size);
+ }
}
if (buf_reader != NULL) {
enum message_type_t msg_type =
analyze_protocol(buf_reader->buffer, buf_reader->data_len, conn);
-// bpf_printk("connection: %lld, message type: %d\n", conid,
msg_type);
// if send request data to remote address or receive response data
from remote address
// then, recognized current connection is client
if ((msg_type == kRequest && data_direction ==
SOCK_DATA_DIRECTION_EGRESS) ||
@@ -301,8 +336,8 @@ static __always_inline void process_write_data(void *ctx,
__u64 id, struct sock_
// if send response data to remote address or receive request data
from remote address
// then, recognized current connection is server
- } else if ((msg_type == kResponse && data_direction ==
SOCK_DATA_DIRECTION_INGRESS) ||
- (msg_type == kRequest && data_direction ==
SOCK_DATA_DIRECTION_EGRESS)) {
+ } else if ((msg_type == kResponse && data_direction ==
SOCK_DATA_DIRECTION_EGRESS) ||
+ (msg_type == kRequest && data_direction ==
SOCK_DATA_DIRECTION_INGRESS)) {
conn->role = CONNECTION_ROLE_TYPE_SERVER;
}
}
@@ -650,8 +685,11 @@ int sys_sendmmsg(struct pt_regs* ctx) {
struct sock_data_args_t data_args = {};
data_args.fd = fd;
- data_args.mmsg = mmsghdr;
- data_args.iovec = _(mmsghdr->msg_hdr.msg_iov);
+ struct iovec *msg_iov = _(mmsghdr->msg_hdr.msg_iov);
+ data_args.iovec = msg_iov;
+ size_t msg_iovlen = _(mmsghdr->msg_hdr.msg_iovlen);
+ data_args.iovlen = msg_iovlen;
+ data_args.msg_len = &mmsghdr->msg_hdr.msg_iovlen;
data_args.start_nacs = bpf_ktime_get_ns();
bpf_map_update_elem(&socket_data_args, &id, &data_args, 0);
return 0;
@@ -671,9 +709,8 @@ int sys_sendmmsg_ret(struct pt_regs* ctx) {
// socket data
struct sock_data_args_t *data_args =
bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
- struct mmsghdr *mmsg = data_args->mmsg;
- __u32 bytes_count = _(mmsg->msg_len);
- data_args->iovlen = _(mmsg->msg_hdr.msg_iovlen);
+ __u32 bytes_count;
+ BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
process_write_data(ctx, id, data_args, bytes_count,
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -937,8 +974,11 @@ int sys_recvmmsg(struct pt_regs* ctx) {
struct sock_data_args_t data_args = {};
data_args.fd = fd;
- data_args.mmsg = mmsghdr;
- data_args.iovec = _(mmsghdr->msg_hdr.msg_iov);
+ struct iovec *msg_iov = _(mmsghdr->msg_hdr.msg_iov);
+ data_args.iovec = msg_iov;
+ size_t msg_iovlen = _(mmsghdr->msg_hdr.msg_iovlen);
+ data_args.iovlen = msg_iovlen;
+ data_args.msg_len = &mmsghdr->msg_hdr.msg_iovlen;
data_args.start_nacs = bpf_ktime_get_ns();
bpf_map_update_elem(&socket_data_args, &id, &data_args, 0);
return 0;
@@ -958,9 +998,8 @@ int sys_recvmmsg_ret(struct pt_regs* ctx) {
// socket data
struct sock_data_args_t *data_args =
bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
- struct mmsghdr *mmsg = data_args->mmsg;
- __u32 bytes_count = _(mmsg->msg_len);
- data_args->iovlen = _(mmsg->msg_hdr.msg_iovlen);
+ __u32 bytes_count;
+ BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
process_write_data(ctx, id, data_args, bytes_count,
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG);
}
bpf_map_delete_elem(&socket_data_args, &id);
diff --git a/bpf/profiling/network/protocol_analyze.h
b/bpf/profiling/network/protocol_analyze.h
index f233c96..16f122c 100644
--- a/bpf/profiling/network/protocol_analyze.h
+++ b/bpf/profiling/network/protocol_analyze.h
@@ -45,17 +45,17 @@ struct protocol_message_t {
__u32 type;
};
-#define BPF_PROBE_READ_VAR(value, ptr) bpf_probe_read(&value, sizeof(value),
ptr)
+#define BPF_PROBE_READ_VAR1(value, ptr) bpf_probe_read(&value, sizeof(value),
ptr)
static __inline int32_t read_big_endian_int32(const char* buf) {
int32_t length;
- BPF_PROBE_READ_VAR(length, buf);
+ BPF_PROBE_READ_VAR1(length, buf);
return bpf_ntohl(length);
}
static __inline int32_t read_big_endian_int16(const char* buf) {
int16_t val;
- BPF_PROBE_READ_VAR(val, buf);
+ BPF_PROBE_READ_VAR1(val, buf);
return bpf_ntohl(val);
}
@@ -299,9 +299,12 @@ if (len < kMinPayloadLen || len > kMaxPayloadLen) {
return kUnknown;
}
// If the input includes a whole message (1 byte tag + length), check the last
character.
-//if ((len + 1 <= (int)count) && (buf[count-1] != '\0')) {
-//return kUnknown;
-//}
+if (count > MAX_SOCKET_BUFFER_READ_LENGTH) {
+ count = MAX_SOCKET_BUFFER_READ_LENGTH;
+}
+if ((len + 1 <= (int)count) && (buf[count-1] != '\0')) {
+return kUnknown;
+}
return kRequest;
}
@@ -346,7 +349,7 @@ static const uint8_t kComStmtExecute = 0x17;
static const uint8_t kComStmtClose = 0x19;
// Second statement checks whether suspected header matches the length of
current packet.
-bool use_prev_buf = (conn_info->prev_count == 4) &&
(*((uint32_t*)conn_info->prev_buf) == count);
+bool use_prev_buf = (conn_info->prev_count == 4) &&
((size_t)read_big_endian_int32(conn_info->prev_buf) == count);
if (use_prev_buf) {
// Check the header_state to find out if the header has been read. MySQL
server tends to
@@ -588,6 +591,9 @@ default:
return kUnknown;
}
if (mux_type == kRerr || mux_type == kRerrOld) {
+if (length > MAX_SOCKET_BUFFER_READ_LENGTH) {
+ length = MAX_SOCKET_BUFFER_READ_LENGTH;
+}
if (buf[length - 5] != 'c' || buf[length - 4] != 'h' || buf[length - 3] != 'e'
||
buf[length - 2] != 'c' || buf[length - 1] != 'k')
return kUnknown;
@@ -619,6 +625,9 @@ static __inline enum message_type_t
infer_nats_message(const char* buf, size_t c
if (count < 3) {
return kUnknown;
}
+if (count > MAX_SOCKET_BUFFER_READ_LENGTH) {
+ count = MAX_SOCKET_BUFFER_READ_LENGTH;
+}
// The last two chars are \r\n, the terminal sequence of all NATS messages.
if (buf[count - 2] != '\r') {
return kUnknown;
@@ -673,8 +682,8 @@ static __inline enum message_type_t analyze_protocol(char
*buf, __u32 count, str
inferred_message.protocol = kProtocolCQL;
} else if ((inferred_message.type = infer_mongo_message(buf, count)) !=
kUnknown) {
inferred_message.protocol = kProtocolMongo;
- } else if ((inferred_message.type = infer_pgsql_message(buf, count)) !=
kUnknown) {
- inferred_message.protocol = kProtocolPGSQL;
+// } else if ((inferred_message.type = infer_pgsql_message(buf, count)) !=
kUnknown) {
+// inferred_message.protocol = kProtocolPGSQL;
} else if ((inferred_message.type = infer_mysql_message(buf, count,
conn_info)) != kUnknown) {
inferred_message.protocol = kProtocolMySQL;
// } else if ((inferred_message.type = infer_mux_message(buf, count)) !=
kUnknown) {
diff --git a/bpf/profiling/network/sock_stats.h
b/bpf/profiling/network/sock_stats.h
index 8c1156e..a597116 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -33,6 +33,15 @@ struct active_connection_t {
// socket type
__u32 socket_family;
+ // remote address
+ __u32 remote_addr_v4;
+ __u8 remote_addr_v6[16];
+ __u32 remote_port;
+ // local address
+ __u32 local_addr_v4;
+ __u8 local_addr_v6[16];
+ __u16 local_port;
+
// basic stats(bytes, avg(exe_time/count))
__u64 write_bytes;
__u64 write_count;
@@ -50,6 +59,9 @@ struct active_connection_t {
__u64 prev_count;
char prev_buf[4];
__u32 prepend_length_header;
+
+ // connect event already send
+ __u32 connect_event_send;
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
@@ -126,6 +138,18 @@ struct socket_close_event_t {
__u32 role;
__u32 fix;
+ // socket type
+ __u32 socket_family;
+ // upstream
+ __u32 remote_addr_v4;
+ __u8 remote_addr_v6[16];
+ __u32 remote_port;
+ // downstream
+ __u32 local_addr_v4;
+ __u8 local_addr_v6[16];
+ __u16 local_port;
+ __u32 fix1;
+
// basic stats(bytes, avg(exe_time/count))
__u64 write_bytes;
__u64 write_count;
diff --git a/go.mod b/go.mod
index 03653a4..5c74485 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639
+ github.com/orcaman/concurrent-map v1.0.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.3.0
diff --git a/go.sum b/go.sum
index 6e64dd0..0d475d3 100644
--- a/go.sum
+++ b/go.sum
@@ -356,6 +356,8 @@ github.com/onsi/gomega
v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.1/go.mod
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/orcaman/concurrent-map v1.0.0
h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
+github.com/orcaman/concurrent-map v1.0.0/go.mod
h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod
h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod
h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.9.4
h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
diff --git a/pkg/process/api/process.go b/pkg/process/api/process.go
index 53f98ba..bde8bda 100644
--- a/pkg/process/api/process.go
+++ b/pkg/process/api/process.go
@@ -54,6 +54,11 @@ type ProcessInterface interface {
ProfilingStat() *profiling.Info
// ExeName get execute file name
ExeName() (string, error)
+
+ // PortIsExpose check the port is exposed
+ PortIsExpose(port int) bool
+ // DetectNewExposePort add a new detected expose port
+ DetectNewExposePort(port int)
}
// ProcessEntity is related to backend entity concept
diff --git a/pkg/process/finders/base/process.go
b/pkg/process/finders/base/process.go
index 49246fe..fc3b46f 100644
--- a/pkg/process/finders/base/process.go
+++ b/pkg/process/finders/base/process.go
@@ -36,4 +36,6 @@ type DetectedProcess interface {
DetectType() api.ProcessDetectType
// ProfilingStat of process
ProfilingStat() *profiling.Info
+ // ExposePorts define which ports are exposed
+ ExposePorts() []int
}
diff --git a/pkg/process/finders/base/template.go
b/pkg/process/finders/base/template.go
index aa4b7da..2aee7c4 100644
--- a/pkg/process/finders/base/template.go
+++ b/pkg/process/finders/base/template.go
@@ -123,3 +123,15 @@ func (p *TemplateProcess) Pid() int32 {
func (p *TemplateProcess) WorkDir() (string, error) {
return p.Cwd()
}
+
+// ExeNameInCommandLine means the executed file name in the command line string
+func (p *TemplateProcess) ExeNameInCommandLine() (string, error) {
+ cmdline, err := p.CmdlineSlice()
+ if err != nil {
+ return "", err
+ }
+ if len(cmdline) == 0 {
+ return "", fmt.Errorf("cannot found the command line")
+ }
+ return cmdline[0], nil
+}
diff --git a/pkg/process/finders/context.go b/pkg/process/finders/context.go
index 8139985..d544e17 100644
--- a/pkg/process/finders/context.go
+++ b/pkg/process/finders/context.go
@@ -47,7 +47,8 @@ type ProcessContext struct {
detectType api.ProcessDetectType
// cache
- exeName string
+ exeName string
+ exposedPorts map[int]bool
}
func (p *ProcessContext) ID() string {
@@ -80,3 +81,11 @@ func (p *ProcessContext) ExeName() (string, error) {
}
return p.exeName, nil
}
+
+func (p *ProcessContext) PortIsExpose(port int) bool {
+ return p.exposedPorts[port]
+}
+
+func (p *ProcessContext) DetectNewExposePort(port int) {
+ p.exposedPorts[port] = true
+}
diff --git a/pkg/process/finders/kubernetes/process.go
b/pkg/process/finders/kubernetes/process.go
index 547e2ae..45b2511 100644
--- a/pkg/process/finders/kubernetes/process.go
+++ b/pkg/process/finders/kubernetes/process.go
@@ -69,3 +69,25 @@ func (p *Process) DetectType() api.ProcessDetectType {
func (p *Process) ProfilingStat() *profiling.Info {
return p.profiling
}
+
+func (p *Process) ExposePorts() []int {
+ result := make([]int, 0)
+ for _, cp := range p.podContainer.ContainerSpec.Ports {
+ result = append(result, int(cp.ContainerPort))
+ if cp.HostPort > 0 {
+ result = append(result, int(cp.HostPort))
+ }
+ }
+ connections, err := p.original.Connections()
+ if err != nil {
+ log.Warnf("query the process connection error: pid: %d, error:
%v", p.pid, err)
+ return result
+ }
+ for _, c := range connections {
+ if c.Status == "LISTEN" {
+ result = append(result, int(c.Laddr.Port))
+ }
+ }
+
+ return result
+}
diff --git a/pkg/process/finders/scanner/process.go
b/pkg/process/finders/scanner/process.go
index d13d77d..ae5ebed 100644
--- a/pkg/process/finders/scanner/process.go
+++ b/pkg/process/finders/scanner/process.go
@@ -109,6 +109,20 @@ func (p *Process) BuildIdentity() string {
p.entity.InstanceName, p.entity.ProcessName)
}
+func (p *Process) ExposePorts() []int {
+ connections, err := p.original.Connections()
+ if err != nil {
+ log.Warnf("error getting the process connections, pid: %d,
error: %v", p.pid, err)
+ }
+ ports := make([]int, 0)
+ for _, con := range connections {
+ if con.Status == "LISTEN" {
+ ports = append(ports, int(con.Laddr.Port))
+ }
+ }
+ return ports
+}
+
func requiredNotNull(err error, key, value string) error {
if err != nil {
return err
diff --git a/pkg/process/finders/storage.go b/pkg/process/finders/storage.go
index 1c10e7f..13e28a1 100644
--- a/pkg/process/finders/storage.go
+++ b/pkg/process/finders/storage.go
@@ -282,10 +282,15 @@ func (s *ProcessStorage) SyncAllProcessInFinder(finder
api.ProcessDetectType, pr
}
func (s *ProcessStorage) constructNewProcessContext(finder
api.ProcessDetectType, process base.DetectedProcess) *ProcessContext {
+ exporsedPorts := make(map[int]bool)
+ for _, p := range process.ExposePorts() {
+ exporsedPorts[p] = true
+ }
return &ProcessContext{
syncStatus: NotReport,
detectProcess: process,
detectType: finder,
+ exposedPorts: exporsedPorts,
}
}
diff --git a/pkg/profiling/task/network/analyzer.go
b/pkg/profiling/task/network/analyzer.go
index 7ce28de..e92a574 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyzer.go
@@ -19,9 +19,17 @@ package network
import (
"github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/tools"
+)
+
+const (
+ layerMeshDP = "MESH_DP"
+ layerMeshApp = "MESH"
+ processEnvoy = "envoy"
)
type TrafficAnalyzer struct {
+ existingProcesses map[int32][]api.ProcessInterface
// used to find local same with remote address
// the connect request(local:a -> remote:b) same with accept
address(remote:a -> local:b)
// key: localIP:port+RemoteIP+port
@@ -31,8 +39,8 @@ type TrafficAnalyzer struct {
// used to find only have the remote address connection
// the connect request(local:unknown -> remote:b), server side
accept(local:b)
// key: RemoteIP:port
- // value: remotePid
- peerAddressCache map[PeerAddress]uint32
+ // value: remotePid list
+ peerAddressCache map[PeerAddress][]uint32
// used to find the envoy client, service connect to the
service(outbound), but envoy accept the request through iptables
// the connect request(local:a -> remote:b(upstream service)), envoy
side accept(local:c -> remote:a)
@@ -44,14 +52,21 @@ type TrafficAnalyzer struct {
// key: pid
// value: process entities
processData map[uint32][]api.ProcessInterface
+
+ // all local addresses(host only)
+ // key: ip
+ // value: [entity layer]process
+ localAddresses map[string]map[string]api.ProcessInterface
}
-func NewTrafficAnalyzer() *TrafficAnalyzer {
+func NewTrafficAnalyzer(processes map[int32][]api.ProcessInterface)
*TrafficAnalyzer {
return &TrafficAnalyzer{
+ existingProcesses: processes,
localWithPeerCache:
make(map[LocalWithPeerAddress]*PidWithRole),
- peerAddressCache: make(map[PeerAddress]uint32),
+ peerAddressCache: make(map[PeerAddress][]uint32),
envoyAcceptClientAddressCache:
make(map[PeerAddress]*AddressWithPid),
processData:
make(map[uint32][]api.ProcessInterface),
+ localAddresses:
make(map[string]map[string]api.ProcessInterface),
}
}
@@ -86,6 +101,7 @@ func (t *TrafficAnalyzer)
CombineConnectionToTraffics(connections []*ConnectionC
continue
}
+ t.tryingToGenerateTheRoleWhenRemotePidCannotFound(con)
var pidToRemoteKey PidToRemoteTrafficKey
pidToRemoteKey.LocalPid = con.LocalPid
pidToRemoteKey.RemoteIP = con.RemoteIP
@@ -122,9 +138,33 @@ func (t *TrafficAnalyzer)
CombineConnectionToTraffics(connections []*ConnectionC
return result
}
+func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con
*ConnectionContext) {
+ if con.Role != ConnectionRoleUnknown {
+ return
+ }
+ // local process address or process could not found, then could analyze
the role
+ if con.LocalPort == 0 || len(con.LocalProcesses) == 0 {
+ return
+ }
+ var role ConnectionRole
+ // if port is expose, and remote address is not local pid
+ // then the role of connection is server side usually
+ if con.LocalProcesses[0].PortIsExpose(int(con.LocalPort)) {
+ role = ConnectionRoleServer
+ } else {
+ role = ConnectionRoleClient
+ }
+
+ con.Role = role
+ log.Debugf("found current connection role is unknown, analyzed role is
%s through local port. %s:%d(%d)->%s:%d",
+ role.String(), con.LocalIP, con.LocalPort, con.LocalPid,
con.RemoteIP, con.RemotePort)
+}
+
func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic,
con *ConnectionContext, remotePid uint32) *ProcessTraffic {
if traffic == nil {
traffic = &ProcessTraffic{
+ analyzer: t,
+
LocalPid: con.LocalPid,
LocalProcesses: con.LocalProcesses,
LocalIP: con.LocalIP,
@@ -181,6 +221,10 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic
*ProcessTraffic, con
return traffic
}
+func (t *TrafficAnalyzer) IsLocalAddressInCache(ip string) bool {
+ return len(t.localAddresses[ip]) > 0
+}
+
func (t *TrafficAnalyzer) buildCache(connections []*ConnectionContext) {
for _, con := range connections {
if t.ipNotEmpty(con.LocalIP, con.LocalPort) &&
t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
@@ -193,14 +237,32 @@ func (t *TrafficAnalyzer) buildCache(connections
[]*ConnectionContext) {
Pid: con.LocalPid,
Role: con.Role,
}
+ }
+ if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
+ peerAddress := PeerAddress{
+ RemoteIP: con.LocalIP,
+ RemotePort: con.LocalPort,
+ }
+ t.peerAddressCache[peerAddress] =
append(t.peerAddressCache[peerAddress], con.LocalPid)
+ if len(con.LocalProcesses) > 0 {
+ localAddressProcesses :=
t.localAddresses[con.LocalIP]
+ if len(localAddressProcesses) == 0 {
+ localAddressProcesses =
make(map[string]api.ProcessInterface)
+ t.localAddresses[con.LocalIP] =
localAddressProcesses
+ }
+ for _, p := range con.LocalProcesses {
+ localAddressProcesses[p.Entity().Layer]
= p
+ }
+ }
+ } else if t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
// if server side is envoy
if con.Role == ConnectionRoleServer &&
len(con.LocalProcesses) > 0 {
name, err := con.LocalProcesses[0].ExeName()
if err != nil {
log.Warnf("get process exe name
failure, pid: %d, error: %v", con.LocalPid, err)
}
- if name == "envoy" {
+ if name == processEnvoy {
t.envoyAcceptClientAddressCache[PeerAddress{
RemoteIP: con.RemoteIP,
RemotePort: con.RemotePort,
@@ -212,27 +274,45 @@ func (t *TrafficAnalyzer) buildCache(connections
[]*ConnectionContext) {
}
}
}
- if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
- t.peerAddressCache[PeerAddress{
- RemoteIP: con.LocalIP,
- RemotePort: con.LocalPort,
- }] = con.LocalPid
- }
if len(t.processData[con.LocalPid]) == 0 {
t.processData[con.LocalPid] = con.LocalProcesses
}
+ t.processExportPortAnalyze(con)
+ }
+}
+
+func (t *TrafficAnalyzer) processExportPortAnalyze(con *ConnectionContext) {
+ // if the process exists, role of connection is server mode and local
port is exists1
+ // add the detected port into the processes
+ if len(con.LocalProcesses) > 0 && con.Role == ConnectionRoleServer &&
con.LocalPort > 0 {
+ for _, p := range con.LocalProcesses {
+ p.DetectNewExposePort(int(con.LocalPort))
+ }
}
}
func (t *TrafficAnalyzer) findRemotePid(con *ConnectionContext) uint32 {
- if !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
- return 0
+ // full address
+ if pid := t.findRemotePidWhenContainsFullAddress(con); pid > 0 {
+ return pid
}
- // try to find the target pid first
+ // only remote address
+ if pid := t.findRemotePidWhenContainsRemoteAddress(con); pid > 0 {
+ return pid
+ }
+
+ // mesh environment
+ if pid := t.findRemotePidWhenMeshEnvironment(con); pid > 0 {
+ return pid
+ }
+ return 0
+}
+
+func (t *TrafficAnalyzer) findRemotePidWhenContainsFullAddress(con
*ConnectionContext) uint32 {
// match to localWithPeerCache
- if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
+ if t.ipNotEmpty(con.LocalIP, con.LocalPort) &&
t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
data := t.localWithPeerCache[LocalWithPeerAddress{
LocalIP: con.RemoteIP,
LocalPort: con.RemotePort,
@@ -258,8 +338,10 @@ func (t *TrafficAnalyzer) findRemotePid(con
*ConnectionContext) uint32 {
RemotePort: con.LocalPort,
}]
if addr != nil {
- con.RemoteIP = addr.RemoteIP
- con.RemotePort = addr.RemotePort
+ if t.ipNotEmpty(addr.RemoteIP, addr.RemotePort)
{
+ con.RemoteIP = addr.RemoteIP
+ con.RemotePort = addr.RemotePort
+ }
log.Debugf("found envoy connection:
%s:%d->%s:%d", con.LocalIP, con.LocalPort, con.RemoteIP, con.RemotePort)
return addr.Pid
}
@@ -267,24 +349,79 @@ func (t *TrafficAnalyzer) findRemotePid(con
*ConnectionContext) uint32 {
}
}
+ return 0
+}
+
+func (t *TrafficAnalyzer) findRemotePidWhenContainsRemoteAddress(con
*ConnectionContext) uint32 {
+ if !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
+ return 0
+ }
// use non-strict verification, don't verify the client role, ensure
that pid is available to be greatest extent
// because the information of role maybe missing when not trigger the
connect/accept
- pid := t.peerAddressCache[PeerAddress{
+ pidCaches := t.peerAddressCache[PeerAddress{
RemoteIP: con.RemoteIP,
RemotePort: con.RemotePort,
}]
- if pid != 0 {
- return pid
+ if len(pidCaches) > 0 {
+ result := pidCaches[0]
+ // when the remote peer address contains multiple pid
+ // the process usually not self
+ for _, pid := range pidCaches {
+ if pid != con.LocalPid {
+ result = pid
+ }
+ }
+ log.Debugf("found remote address by peer address cache: %s:%d
-> %d", con.RemoteIP, con.RemotePort, result)
+ return result
+ }
+ return 0
+}
+
+func (t *TrafficAnalyzer) findRemotePidWhenMeshEnvironment(con
*ConnectionContext) uint32 {
+ // special handle for mesh application, when it could not match the
process through address
+ if len(con.LocalProcesses) == 0 || !t.ipNotEmpty(con.RemoteIP,
con.RemotePort) {
+ return 0
+ }
+ for _, localProcess := range con.LocalProcesses {
+ // match when the MESH data plane not found the MESH application
+ if localProcess.Entity().Layer == layerMeshDP {
+ addresses := t.localAddresses[con.RemoteIP]
+ if len(addresses) == 0 {
+ continue
+ }
+ if p := addresses[layerMeshApp]; p != nil {
+ log.Debugf("found in the mesh application,
remote ip: %s", con.RemoteIP)
+ return uint32(p.Pid())
+ }
+ continue
+ }
+ // if current is mesh application, and remote address is not
local and dns, them it's must be sent to the MESH_DP
+ if localProcess.Entity().Layer == layerMeshApp &&
con.RemotePort != 53 &&
+ len(t.localAddresses[con.RemoteIP]) == 0 &&
!tools.IsLocalHostAddress(con.RemoteIP) {
+ if envoyPid :=
t.findSameInstanceMeshDP(localProcess.Entity()); envoyPid != 0 {
+ log.Debugf("found in the mesh data plane,
remote ip: %s", con.RemoteIP)
+ return envoyPid
+ }
+ }
}
- // trying to parse the address with revert the network port
- // pid = t.peerAddressCache[PeerAddress{
- // RemoteIP: con.RemoteIP,
- // RemotePort: parsePort(con.RemotePort),
- // }]
- // if pid != 0 {
- // return pid
- //}
+ return 0
+}
+func (t *TrafficAnalyzer) findSameInstanceMeshDP(entity *api.ProcessEntity)
uint32 {
+ for _, psList := range t.existingProcesses {
+ for _, p := range psList {
+ if p.Entity().Layer == layerMeshDP &&
p.Entity().ServiceName == entity.ServiceName && p.Entity().InstanceName ==
entity.InstanceName {
+ name, err := p.ExeName()
+ if err != nil {
+ log.Warnf("query the process execute
file name failure: %d, error: %v", p.Pid(), err)
+ continue
+ }
+ if name == processEnvoy {
+ return uint32(p.Pid())
+ }
+ }
+ }
+ }
return 0
}
diff --git a/pkg/profiling/task/network/context.go
b/pkg/profiling/task/network/context.go
index c918e46..451496f 100644
--- a/pkg/profiling/task/network/context.go
+++ b/pkg/profiling/task/network/context.go
@@ -18,6 +18,7 @@
package network
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -25,6 +26,8 @@ import (
"sync"
"unsafe"
+ cmap "github.com/orcaman/concurrent-map"
+
"github.com/sirupsen/logrus"
"github.com/hashicorp/go-multierror"
@@ -42,12 +45,10 @@ type Context struct {
bpf *bpfObjects // current bpf programs
// standard syscall connections
- activeConnections map[string]*ConnectionContext // current
activeConnections connections
- closedConnections []*ConnectionContext // closed connections'
- flushClosedEvents chan *SocketCloseEventWrapper // connection have been
closed, it is a queue to cache unknown active connections
- connectionLock sync.Mutex // make sure read write
closedConnections is synchronized
- // if the socket close event not handled when flushing, then cache to
this array to prevent dead-lock with flushClosedEvents
- secondCloseEventCache []*SocketCloseEventWrapper
+ activeConnections cmap.ConcurrentMap // current activeConnections
connections
+ closedConnections []*ConnectionContext // closed connections'
+ flushClosedEvents chan *SocketCloseEvent // connection have been
closed, it is a queue to cache unknown active connections
+ sockParseQueue chan *ConnectionContext // socket address parse queue
// socket retransmit/drop
socketExceptionStatics map[SocketBasicKey]*SocketExceptionValue
@@ -114,10 +115,10 @@ type ConnectionContext struct {
func NewContext() *Context {
return &Context{
- activeConnections: make(map[string]*ConnectionContext),
+ activeConnections: cmap.New(),
closedConnections: make([]*ConnectionContext, 0),
- flushClosedEvents: make(chan *SocketCloseEventWrapper,
5000),
- secondCloseEventCache: make([]*SocketCloseEventWrapper, 0),
+ flushClosedEvents: make(chan *SocketCloseEvent, 5000),
+ sockParseQueue: make(chan *ConnectionContext, 5000),
processes: make(map[int32][]api.ProcessInterface),
socketExceptionStatics:
make(map[SocketBasicKey]*SocketExceptionValue),
}
@@ -162,6 +163,34 @@ func (c *Context) FlushAllConnection()
([]*ConnectionContext, error) {
return allContexts, nil
}
+func (c *Context) StartSocketAddressParser(ctx context.Context) {
+ for i := 0; i < 2; i++ {
+ go c.handleSocketParseQueue(ctx)
+ }
+}
+
+func (c *Context) handleSocketParseQueue(ctx context.Context) {
+ for {
+ select {
+ case cc := <-c.sockParseQueue:
+ socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
+ if err != nil {
+ // if the remote port of connection is empty,
then this connection not available basically
+ if cc.RemotePort == 0 {
+ log.Warnf("complete the socket error,
pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
+ }
+ continue
+ }
+ cc.LocalIP = socket.SrcIP
+ cc.LocalPort = socket.SrcPort
+ cc.RemoteIP = socket.DestIP
+ cc.RemotePort = socket.DestPort
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
func (c *Context) combineExceptionToConnections(ccs []*ConnectionContext, exps
map[SocketBasicKey]*SocketExceptionValue) {
for key, value := range exps {
var remotePort, localPort = uint16(key.RemotePort),
uint16(key.LocalPort)
@@ -213,13 +242,10 @@ func (c *Context) cleanAndGetAllExceptionContexts()
map[SocketBasicKey]*SocketEx
}
func (c *Context) getAllConnectionWithContext() []*ConnectionContext {
- c.connectionLock.Lock()
- defer c.connectionLock.Unlock()
-
result := make([]*ConnectionContext, 0)
result = append(result, c.closedConnections...)
- for _, con := range c.activeConnections {
- result = append(result, con)
+ for _, con := range c.activeConnections.Items() {
+ result = append(result, con.(*ConnectionContext))
}
c.closedConnections = make([]*ConnectionContext, 0)
@@ -233,6 +259,13 @@ type ActiveConnectionInBPF struct {
Role ConnectionRole
SocketFamily uint32
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemoteAddrPort uint32
+ LocalAddrV4 uint32
+ LocalAddrV6 [16]uint8
+ LocalAddrPort uint32
+
WriteBytes uint64
WriteCount uint64
WriteExeTime uint64
@@ -248,6 +281,9 @@ type ActiveConnectionInBPF struct {
ProtocolPrevCount uint64
ProtocolPrevBuf [4]byte
ProtocolPrependHeader uint32
+
+ // the connect event is already sent
+ ConnectEventIsSent uint32
}
type HistogramDataKey struct {
@@ -289,7 +325,6 @@ func (c *Context) fillConnectionMetrics(ccs
[]*ConnectionContext) {
}
// update the role
- cc.Role = activeConnection.Role
cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes,
activeConnection.WriteCount, activeConnection.WriteExeTime)
cc.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes,
activeConnection.ReadCount, activeConnection.ReadExeTime)
cc.WriteRTTCounter.UpdateToCurrent(0,
activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
@@ -371,29 +406,10 @@ func (c *Context) handleSocketConnectEvent(data
interface{}) {
}
// build active connection information
- con := &ConnectionContext{
- // metadata
- ConnectionID: event.ConID,
- RandomID: event.RandomID,
- LocalPid: event.Pid,
- SocketFD: event.FD,
- LocalProcesses: processes,
- ConnectionClosed: false,
-
- // metrics
- WriteCounter: NewSocketDataCounterWithHistory(),
- ReadCounter: NewSocketDataCounterWithHistory(),
- WriteRTTCounter: NewSocketDataCounterWithHistory(),
- WriteRTTHistogram: NewSocketDataHistogramWithHistory(),
- WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(),
- ReadExeTimeHistogram: NewSocketDataHistogramWithHistory(),
- ConnectExecuteTime: event.ExeTime,
- }
-
+ con := c.newConnectionContext(event.ConID, event.RandomID, event.Pid,
event.FD, processes, false)
+ con.ConnectExecuteTime = event.ExeTime
con.Role = event.Role
- var trace string
if event.NeedComplete == 0 {
- trace = "0"
con.RemotePort = uint16(event.RemoteAddrPort)
con.LocalPort = uint16(event.LocalAddrPort)
if event.SocketFamily == unix.AF_INET {
@@ -404,27 +420,16 @@ func (c *Context) handleSocketConnectEvent(data
interface{}) {
con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
}
} else {
- socket, err := ParseSocket(event.Pid, event.FD)
- if err != nil {
- trace = "1-1"
- // if the remote address exists then setting it
- if event.RemoteAddrPort == 0 {
- log.Debugf("complete the socket error, pid: %d,
fd: %d, error: %v", event.Pid, event.FD, err)
+ // if the remote address exists then setting it
+ if event.RemoteAddrPort != 0 {
+ con.RemotePort = uint16(event.RemoteAddrPort)
+ if event.SocketFamily == unix.AF_INET {
+ con.RemoteIP =
parseAddressV4(event.RemoteAddrV4)
} else {
- con.RemotePort = uint16(event.RemoteAddrPort)
- if event.SocketFamily == unix.AF_INET {
- con.RemoteIP =
parseAddressV4(event.RemoteAddrV4)
- } else {
- con.RemoteIP =
parseAddressV6(event.RemoteAddrV6)
- }
+ con.RemoteIP =
parseAddressV6(event.RemoteAddrV6)
}
- } else {
- trace = "1-2"
- con.LocalIP = socket.SrcIP
- con.LocalPort = socket.SrcPort
- con.RemoteIP = socket.DestIP
- con.RemotePort = socket.DestPort
}
+ c.sockParseQueue <- con
}
// add to the context
@@ -432,15 +437,13 @@ func (c *Context) handleSocketConnectEvent(data
interface{}) {
if log.Enable(logrus.DebugLevel) {
marshal, _ := json.Marshal(event)
- log.Debugf("found connect event(%s): role: %s, %s:%d:%d ->
%s:%d, json: %s", trace, con.Role.String(),
+ log.Debugf("found connect event: role: %s, %s:%d:%d -> %s:%d,
json: %s", con.Role.String(),
con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP,
con.RemotePort, string(marshal))
}
}
func (c *Context) saveActiveConnection(con *ConnectionContext) {
- c.connectionLock.Lock()
- defer c.connectionLock.Unlock()
- c.activeConnections[c.generateConnectionKey(con.ConnectionID,
con.RandomID)] = con
+ c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID,
con.RandomID), con)
}
type SocketCloseEvent struct {
@@ -452,6 +455,15 @@ type SocketCloseEvent struct {
Role ConnectionRole
Fix uint32
+ SocketFamily uint32
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemoteAddrPort uint32
+ LocalAddrV4 uint32
+ LocalAddrV6 [16]uint8
+ LocalAddrPort uint32
+ Fix1 uint32
+
WriteBytes uint64
WriteCount uint64
WriteExeTime uint64
@@ -463,51 +475,68 @@ type SocketCloseEvent struct {
WriteRTTExeTime uint64
}
-type SocketCloseEventWrapper struct {
- *SocketCloseEvent
- NotExistsCount int
-}
-
// batch to re-process all cached closed event
-// if the event re-processes once, then just ignore it
func (c *Context) batchReProcessCachedCloseEvent() {
- // handling the second close event cache first, if it's could not be
handle, then ignored
- if len(c.secondCloseEventCache) > 0 {
- for _, wrapper := range c.secondCloseEventCache {
- event := wrapper.SocketCloseEvent
- if !c.socketClosedEvent0(event) {
- log.Warnf("found close connection event, but
current connection is not in active cache: pid: %d, "+
- "socket fd: %d", event.Pid,
event.SocketFD)
+ for len(c.flushClosedEvents) > 0 {
+ event := <-c.flushClosedEvents
+ if !c.socketClosedEvent0(event) {
+ // if cannot the found the active connection, then just
create a new closed connection context
+ processes := c.processes[int32(event.Pid)]
+ if len(processes) == 0 {
+ continue
}
+ cc := c.newConnectionContext(event.ConID,
event.RandomID, event.Pid, event.SocketFD, processes, true)
+ if event.SocketFamily == unix.AF_INET {
+ cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+ cc.LocalIP = parseAddressV4(event.LocalAddrV4)
+ } else if event.SocketFamily == unix.AF_INET6 {
+ cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+ cc.LocalIP = parseAddressV6(event.LocalAddrV6)
+ } else {
+ continue
+ }
+
+ // append to the closed connection
+ c.closedConnections = append(c.closedConnections,
c.combineClosedConnection(cc, event))
}
- c.secondCloseEventCache = make([]*SocketCloseEventWrapper, 0)
}
+}
- for len(c.flushClosedEvents) > 0 {
- wrapper := <-c.flushClosedEvents
- if c.socketClosedEvent0(wrapper.SocketCloseEvent) {
- continue
- }
- wrapper.NotExistsCount++
- // try to add the flush queue to re-process when next flush all
connections
- c.secondCloseEventCache = append(c.secondCloseEventCache,
wrapper)
+func (c *Context) newConnectionContext(conID, randomID uint64, pid, fd uint32,
processes []api.ProcessInterface, conClosed bool) *ConnectionContext {
+ return &ConnectionContext{
+ // metadata
+ ConnectionID: conID,
+ RandomID: randomID,
+ LocalPid: pid,
+ SocketFD: fd,
+ LocalProcesses: processes,
+ ConnectionClosed: conClosed,
+
+ // metrics
+ WriteCounter: NewSocketDataCounterWithHistory(),
+ ReadCounter: NewSocketDataCounterWithHistory(),
+ WriteRTTCounter: NewSocketDataCounterWithHistory(),
+ WriteRTTHistogram: NewSocketDataHistogramWithHistory(),
+ WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(),
+ ReadExeTimeHistogram: NewSocketDataHistogramWithHistory(),
}
}
func (c *Context) handleSocketCloseEvent(data interface{}) {
event := data.(*SocketCloseEvent)
+ if log.Enable(logrus.DebugLevel) {
+ marshal, _ := json.Marshal(event)
+ log.Debugf("found close event: %s", string(marshal))
+ }
+
// try to handle the socket close event
if !c.socketClosedEvent0(event) {
// is not in active connection, maybe it's not have been added
to activate first
// just add to the close queue, wait for the flush connection
with interval
- c.flushClosedEvents <-
&SocketCloseEventWrapper{SocketCloseEvent: event, NotExistsCount: 1}
+ c.flushClosedEvents <- event
return
}
- if log.Enable(logrus.DebugLevel) {
- marshal, _ := json.Marshal(event)
- log.Debugf("found close event: %s", string(marshal))
- }
}
// SocketExceptionOperationEvent Socket have been retransmitted/drop the
package event
@@ -554,28 +583,28 @@ func (c *Context)
handleSocketExceptionOperationEvent(data interface{}) {
}
func (c *Context) socketClosedEvent0(event *SocketCloseEvent) bool {
- c.connectionLock.Lock()
- defer c.connectionLock.Unlock()
-
- conKey := c.generateConnectionKey(event.ConID, event.RandomID)
- activeCon := c.activeConnections[conKey]
+ activeCon := c.foundAndDeleteConnection(event)
if activeCon == nil {
return false
}
- // delete the active connection
- delete(c.activeConnections, conKey)
-
// combine the connection data
c.closedConnections = append(c.closedConnections,
c.combineClosedConnection(activeCon, event))
return true
}
+func (c *Context) foundAndDeleteConnection(event *SocketCloseEvent)
*ConnectionContext {
+ conKey := c.generateConnectionKey(event.ConID, event.RandomID)
+ val, exists := c.activeConnections.Pop(conKey)
+ if !exists {
+ return nil
+ }
+ return val.(*ConnectionContext)
+}
+
func (c *Context) deleteConnectionOnly(ccs []string) {
- c.connectionLock.Lock()
- defer c.connectionLock.Unlock()
for _, cc := range ccs {
- delete(c.activeConnections, cc)
+ c.activeConnections.Remove(cc)
}
}
@@ -620,6 +649,7 @@ func (c *Context) AddProcesses(processes
[]api.ProcessInterface) error {
if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid),
uint32(1), ebpf.UpdateAny); err1 != nil {
err = multierror.Append(err, err1)
}
+ log.Debugf("add monitor process, pid: %d", pid)
}
return err
}
@@ -650,6 +680,7 @@ func (c *Context) DeleteProcesses(processes
[]api.ProcessInterface) (emptyProces
if err1 :=
c.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
err = multierror.Append(err, err1)
}
+ log.Debugf("delete monitor process: %d", pid)
delete(c.processes, pid)
continue
}
diff --git a/pkg/profiling/task/network/delegate.go
b/pkg/profiling/task/network/delegate.go
index 1b1510c..ef0bf8f 100644
--- a/pkg/profiling/task/network/delegate.go
+++ b/pkg/profiling/task/network/delegate.go
@@ -62,11 +62,7 @@ func (r *DelegateRunner) Init(task *base.ProfilingTask,
processes []api.ProcessI
func (r *DelegateRunner) Run(ctx context.Context, notify
base.ProfilingRunningSuccessNotify) error {
r.ctx, r.cancel = context.WithCancel(ctx)
- if err := realRunner.Start(ctx); err != nil {
- return err
- }
- // add processes
- if err := realRunner.AddProcess(r.processes); err != nil {
+ if err := realRunner.Start(ctx, r.processes); err != nil {
return err
}
notify()
diff --git a/pkg/profiling/task/network/linker.go
b/pkg/profiling/task/network/linker.go
index 278bc09..859930a 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/linker.go
@@ -26,6 +26,8 @@ import (
"os"
"sync"
+ "github.com/apache/skywalking-rover/pkg/tools"
+
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
@@ -33,9 +35,42 @@ import (
"github.com/hashicorp/go-multierror"
)
+const defaultSymbolPrefix = "sys_"
+
type LinkFunc func(symbol string, prog *ebpf.Program) (link.Link, error)
type RingBufferReader func(data interface{})
+var syscallPrefix string
+
+func init() {
+ stat, err := tools.KernelFileProfilingStat()
+ if err != nil {
+ syscallPrefix = defaultSymbolPrefix
+ return
+ }
+ var possiblePrefixes = []string{
+ defaultSymbolPrefix,
+ "__x64_sys_",
+ "__x32_compat_sys_",
+ "__ia32_compat_sys_",
+ "__arm64_sys_",
+ "__s390x_sys_",
+ "__s390_sys_",
+ }
+
+ found := false
+ for _, p := range possiblePrefixes {
+ if stat.FindSymbolAddress(fmt.Sprintf("%sbpf", p)) != 0 {
+ found = true
+ syscallPrefix = p
+ break
+ }
+ }
+ if !found {
+ syscallPrefix = "sys_"
+ }
+}
+
type Linker struct {
closers []io.Closer
errors error
@@ -64,11 +99,12 @@ func (m *Linker) AddSysCall(call string, enter, exit
*ebpf.Program) {
}
func (m *Linker) AddSysCallWithKProbe(call string, linkK LinkFunc, p
*ebpf.Program) {
- kprobe, err := linkK("sys_"+call, p)
+ kprobe, err := linkK(syscallPrefix+call, p)
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("could not
attach syscall with %s: %v", "sys_"+call, err))
} else {
+ log.Debugf("attach to the syscall: %s", syscallPrefix+call)
m.closers = append(m.closers, kprobe)
}
}
@@ -108,7 +144,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, reader
RingBufferReader, dataSup
data := dataSupplier()
if err :=
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err
!= nil {
- log.Warnf("parsing data from %s ringbuffer
error: %v", emap.String(), err)
+ log.Warnf("parsing data from %s, raw size: %d,
ringbuffer error: %v", emap.String(), len(record.RawSample), err)
continue
}
diff --git a/pkg/profiling/task/network/metrics.go
b/pkg/profiling/task/network/metrics.go
index 91fca81..191a0b8 100644
--- a/pkg/profiling/task/network/metrics.go
+++ b/pkg/profiling/task/network/metrics.go
@@ -181,6 +181,8 @@ func subtractionValue(v1, v2 uint64) uint64 {
}
type ProcessTraffic struct {
+ analyzer *TrafficAnalyzer
+
// local process information
LocalPid uint32
LocalProcesses []api.ProcessInterface
@@ -350,7 +352,7 @@ func (r *ProcessTraffic) appendRemoteAddrssInfo(labels
[]*v3.Label, prefix strin
}
}
- if tools.IsLocalHostAddress(r.RemoteIP) {
+ if tools.IsLocalHostAddress(r.RemoteIP) ||
r.analyzer.IsLocalAddressInCache(r.RemoteIP) {
return r.appendMeterValue(labels, prefix+"_local", "true")
}
diff --git a/pkg/profiling/task/network/runner.go
b/pkg/profiling/task/network/runner.go
index a294095..8821903 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -48,12 +48,12 @@ var log = logger.GetLogger("profiling", "task", "network",
"topology")
type Runner struct {
initOnce sync.Once
- startOnce sync.Once
+ startLock sync.Mutex
+ stopOnce sync.Once
meterClient v3.MeterReportServiceClient
reportInterval time.Duration
meterPrefix string
- processes map[int32][]api.ProcessInterface
bpf *bpfObjects
linker *Linker
bpfContext *Context
@@ -64,7 +64,6 @@ type Runner struct {
func NewGlobalRunnerContext() *Runner {
return &Runner{
- processes: make(map[int32][]api.ProcessInterface),
bpfContext: NewContext(),
linker: &Linker{},
}
@@ -78,24 +77,18 @@ func (r *Runner) init(config *base.TaskConfig, moduleMgr
*module.Manager) error
return err
}
-func (r *Runner) AddProcess(processes []api.ProcessInterface) error {
- // adding processes
- return r.bpfContext.AddProcesses(processes)
-}
-
func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool,
error) {
return r.bpfContext.DeleteProcesses(processes)
}
-func (r *Runner) Start(ctx context.Context) error {
- var err error
- r.startOnce.Do(func() {
- err = r.start0(ctx)
- })
- return err
-}
+func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface)
error {
+ r.startLock.Lock()
+ defer r.startLock.Unlock()
+ // if already start, then just adding the processes
+ if r.bpf != nil {
+ return r.bpfContext.AddProcesses(processes)
+ }
-func (r *Runner) start0(ctx context.Context) error {
r.ctx, r.cancel = context.WithCancel(ctx)
// load bpf program
objs := bpfObjects{}
@@ -105,6 +98,14 @@ func (r *Runner) start0(ctx context.Context) error {
r.bpf = &objs
r.bpfContext.Init(&objs)
+ if err := r.bpfContext.AddProcesses(processes); err != nil {
+ return err
+ }
+
+ // register all handlers
+ r.bpfContext.RegisterAllHandlers(r.linker)
+ r.bpfContext.StartSocketAddressParser(r.ctx)
+
// sock opts
r.linker.AddSysCall("connect", objs.SysConnect, objs.SysConnectRet)
r.linker.AddSysCall("accept", objs.SysAccept, objs.SysAcceptRet)
@@ -138,9 +139,6 @@ func (r *Runner) start0(ctx context.Context) error {
// close socket
r.linker.AddSysCall("close", objs.SysClose, objs.SysCloseRet)
- // register all handlers
- r.bpfContext.RegisterAllHandlers(r.linker)
-
if err := r.linker.HasError(); err != nil {
_ = r.linker.Close()
return err
@@ -180,13 +178,14 @@ func (r *Runner) flushMetrics() error {
if log.Enable(logrus.DebugLevel) {
for _, con := range connections {
- log.Debugf("found connection: %s relation: %s:%d(%d) ->
%s:%d, read: %d bytes/%d, write: %d bytes/%d", con.Role.String(),
+ log.Debugf("found connection: %d, %s relation:
%s:%d(%d) -> %s:%d, read: %d bytes/%d, write: %d bytes/%d",
+ con.ConnectionID, con.Role.String(),
con.LocalIP, con.LocalPort, con.LocalPid,
con.RemoteIP, con.RemotePort, con.WriteCounter.Cur.Bytes,
con.WriteCounter.Cur.Count,
con.ReadCounter.Cur.Bytes,
con.ReadCounter.Cur.Count)
}
}
// combine all connection
- analyzer := NewTrafficAnalyzer()
+ analyzer := NewTrafficAnalyzer(r.bpfContext.processes)
traffics := analyzer.CombineConnectionToTraffics(connections)
if len(traffics) == 0 {
return nil
@@ -246,10 +245,14 @@ func (r *Runner) logTheMetricsConnections(traffices
[]*ProcessTraffic) {
}
func (r *Runner) Stop() error {
- r.cancel()
+ if r.cancel != nil {
+ r.cancel()
+ }
var result error
- result = r.closeWhenExists(result, r.linker)
- result = r.closeWhenExists(result, r.bpf)
+ r.stopOnce.Do(func() {
+ result = r.closeWhenExists(result, r.linker)
+ result = r.closeWhenExists(result, r.bpf)
+ })
return result
}
diff --git a/pkg/profiling/task/network/tcpresolver.go
b/pkg/profiling/task/network/tcpresolver.go
index 2184705..92d2326 100644
--- a/pkg/profiling/task/network/tcpresolver.go
+++ b/pkg/profiling/task/network/tcpresolver.go
@@ -33,7 +33,7 @@ import (
)
var (
- notBlankRegex = regexp.MustCompile(`\\s+`)
+ notBlankRegex = regexp.MustCompile(`\s+`)
ipv4StrLen = 8
ipv6StrLen = 32
)
@@ -59,8 +59,6 @@ func ParseSocket(pid, sockfd uint32) (*SocketPair, error) {
var s *SocketPair
s, err = foundAddressByFile(s, err,
fmt.Sprintf(host.GetFileInHost("/proc/%d/net/tcp"), pid), inode)
s, err = foundAddressByFile(s, err,
fmt.Sprintf(host.GetFileInHost("/proc/%d/net/tcp6"), pid), inode)
- s, err = foundAddressByFile(s, err,
host.GetFileInHost("/proc/net/tcp"), inode)
- s, err = foundAddressByFile(s, err,
host.GetFileInHost("/proc/net/tcp6"), inode)
return s, err
}