This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new b667e31  Enhance the network profiling (#40)
b667e31 is described below

commit b667e31f74fcadec93dfd8821afd3f1e01e27d64
Author: mrproliu <[email protected]>
AuthorDate: Sun Jul 24 17:24:44 2022 +0800

    Enhance the network profiling (#40)
---
 bpf/profiling/network/args.h              |  16 ++-
 bpf/profiling/network/netmonitor.c        | 215 ++++++++++++++++------------
 bpf/profiling/network/protocol_analyze.h  |  27 ++--
 bpf/profiling/network/sock_stats.h        |  24 ++++
 go.mod                                    |   1 +
 go.sum                                    |   2 +
 pkg/process/api/process.go                |   5 +
 pkg/process/finders/base/process.go       |   2 +
 pkg/process/finders/base/template.go      |  12 ++
 pkg/process/finders/context.go            |  11 +-
 pkg/process/finders/kubernetes/process.go |  22 +++
 pkg/process/finders/scanner/process.go    |  14 ++
 pkg/process/finders/storage.go            |   5 +
 pkg/profiling/task/network/analyzer.go    | 193 ++++++++++++++++++++++----
 pkg/profiling/task/network/context.go     | 223 +++++++++++++++++-------------
 pkg/profiling/task/network/delegate.go    |   6 +-
 pkg/profiling/task/network/linker.go      |  40 +++++-
 pkg/profiling/task/network/metrics.go     |   4 +-
 pkg/profiling/task/network/runner.go      |  51 +++----
 pkg/profiling/task/network/tcpresolver.go |   4 +-
 20 files changed, 615 insertions(+), 262 deletions(-)

diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index fed379a..ce65720 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -19,7 +19,7 @@
 
 #pragma once
 
-#define MAX_SOCKET_BUFFER_READ_LENGTH 1024
+#define MAX_SOCKET_BUFFER_READ_LENGTH 4095
 
 // unknown the connection type, not trigger the syscall connect,accept
 #define AF_UNKNOWN 0xff
@@ -41,6 +41,7 @@
 #define SOCKET_OPTS_TYPE_RECVFROM   14
 #define SOCKET_OPTS_TYPE_RECVMSG    15
 #define SOCKET_OPTS_TYPE_RECVMMSG   16
+#define SOCKET_OPTS_TYPE_RESENT   17
 
 // tracepoint enter
 struct trace_event_raw_sys_enter {
@@ -106,7 +107,7 @@ struct sock_data_args_t {
     // current read/write is calls on the sockets.
     __u32 is_sock_event;
     size_t iovlen;
-    struct mmsghdr *mmsg;
+    unsigned int* msg_len;
     __u64 start_nacs;
     // rtt
     __u64 rtt_count;
@@ -147,13 +148,18 @@ struct {
     __uint(max_entries, 1);
 } socket_buffer_reader_map SEC(".maps");
 static __inline struct socket_buffer_reader_t* read_socket_data(void *buf, 
__u32 data_bytes) {
+    __u64 size;
     __u32 kZero = 0;
     struct socket_buffer_reader_t* reader = 
bpf_map_lookup_elem(&socket_buffer_reader_map, &kZero);
     if (reader == NULL) {
         return NULL;
     }
-    __u32 max_len = data_bytes < MAX_SOCKET_BUFFER_READ_LENGTH ? (data_bytes & 
MAX_SOCKET_BUFFER_READ_LENGTH - 1) : MAX_SOCKET_BUFFER_READ_LENGTH;
-    reader->data_len = bpf_probe_read_str(&reader->buffer, max_len, buf);
-//    reader->data_len = max_len;
+    size = data_bytes;
+    if (size > MAX_SOCKET_BUFFER_READ_LENGTH) {
+        size = MAX_SOCKET_BUFFER_READ_LENGTH;
+    }
+    asm volatile("%[size] &= 0xfff;\n" ::[size] "+r"(size) :);
+    bpf_probe_read(&reader->buffer, size, buf);
+    reader->data_len = size;
     return reader;
 }
\ No newline at end of file
diff --git a/bpf/profiling/network/netmonitor.c 
b/bpf/profiling/network/netmonitor.c
index 1628a3a..7296606 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -81,7 +81,10 @@ static __always_inline void submit_new_connection(struct 
pt_regs* ctx, __u32 fun
     } else {
         con.role = CONNECTION_ROLE_TYPE_UNKNOWN;
     }
+    __u16 port;
+    __u32 need_complete_addr = 1;
     if (socket != NULL) {
+        need_complete_addr = 0;
         // only get from accept function(server side)
         struct sock* s;
         BPF_CORE_READ_INTO(&s, socket, sk);
@@ -89,32 +92,55 @@ static __always_inline void submit_new_connection(struct 
pt_regs* ctx, __u32 fun
         short unsigned int skc_family;
         BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
         con.socket_family = skc_family;
+
+        if (con.socket_family == AF_INET) {
+            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
+            con.local_port = port;
+            BPF_CORE_READ_INTO(&con.local_addr_v4, s, 
__sk_common.skc_rcv_saddr);
+            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
+            con.remote_port = bpf_ntohs(port);
+            BPF_CORE_READ_INTO(&con.remote_addr_v4, s, __sk_common.skc_daddr);
+        } else if (con.socket_family == AF_INET6) {
+            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
+            con.local_port = port;
+            BPF_CORE_READ_INTO(&con.local_addr_v6, s, 
__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr8);
+            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
+            con.remote_port = bpf_ntohs(port);
+            BPF_CORE_READ_INTO(&con.remote_addr_v6, s, 
__sk_common.skc_v6_daddr.in6_u.u6_addr8);
+       }
     } else if (addr != NULL) {
         con.socket_family = _(addr->sa_family);
+        if (con.socket_family == AF_INET) {
+            struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
+            bpf_probe_read(&con.remote_addr_v4, sizeof(con.remote_addr_v4), 
&daddr->sin_addr.s_addr);
+            bpf_probe_read(&port, sizeof(port), &daddr->sin_port);
+            con.remote_port = bpf_ntohs(port);
+        } else if (con.socket_family == AF_INET6) {
+            struct sockaddr_in6 *daddr = (struct sockaddr_in6 *)addr;
+            bpf_probe_read(&con.remote_addr_v6, sizeof(con.remote_addr_v6), 
&daddr->sin6_addr.s6_addr);
+            bpf_probe_read(&port, sizeof(port), &daddr->sin6_port);
+            con.remote_port = bpf_ntohs(port);
+        }
     } else {
         con.socket_family = AF_UNKNOWN;
     }
 
     // save to the active connection map
     __u64 conid = gen_tgid_fd(tgid, fd);
-    bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
-
+    struct socket_connect_event_t *event = create_socket_connect_event();
     // only trace ipv4, v6, or unknown
-    if (family_should_trace(con.socket_family) == false) {
-        return;
-    }
-
-    // pid is contains
-    if (tgid_should_trace(tgid) == false) {
+    // pid is not contains(monitored)
+    // cannot create connect event object
+    if (family_should_trace(con.socket_family) == false || 
tgid_should_trace(tgid) == false || !event) {
+        con.connect_event_send = false;
+        bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
         return;
     }
+    // default setting as sent
+    con.connect_event_send = true;
+    bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
 
     // send to user-space that have connection activated
-    struct socket_connect_event_t *event = create_socket_connect_event();
-    if (!event) {
-//        bpf_printk("cannot create the socket connect event");
-        return;
-    }
     event->conid = conid;
     event->random_id = con.random_id;
     event->func_name = func_name;
@@ -127,63 +153,36 @@ static __always_inline void submit_new_connection(struct 
pt_regs* ctx, __u32 fun
     // fill the connection
     event->role = con.role;
     event->socket_family = con.socket_family;
-    __u16 port;
-    if (socket != NULL) {
-        event->need_complete_addr = 0;
-        // only get from accept function(server side)
-        struct sock* s;
-        BPF_CORE_READ_INTO(&s, socket, sk);
-
-        if (con.socket_family == AF_INET) {
-            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
-            event->local_port = port;
-            BPF_CORE_READ_INTO(&event->local_addr_v4, s, 
__sk_common.skc_rcv_saddr);
-            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
-            event->remote_port = bpf_ntohs(port);
-            BPF_CORE_READ_INTO(&event->remote_addr_v4, s, 
__sk_common.skc_daddr);
-        } else if (con.socket_family == AF_INET6) {
-            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
-            event->local_port = port;
-            BPF_CORE_READ_INTO(&event->local_addr_v6, s, 
__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr8);
-            BPF_CORE_READ_INTO(&port, s, __sk_common.skc_dport);
-            event->remote_port = bpf_ntohs(port);
-            BPF_CORE_READ_INTO(&event->remote_addr_v6, s, 
__sk_common.skc_v6_daddr.in6_u.u6_addr8);
-       } else {
-            event->local_port = 0;
-            event->remote_port = 0;
-       }
-    } else if (addr != NULL) {
-        event->need_complete_addr = 1;
-        if (con.socket_family == AF_INET) {
-            struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
-            bpf_probe_read(&event->remote_addr_v4, 
sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
-            bpf_probe_read(&port, sizeof(port), &daddr->sin_port);
-            event->remote_port = bpf_ntohs(port);
-        } else if (con.socket_family == AF_INET6) {
-            struct sockaddr_in6 *daddr = (struct sockaddr_in6 *)addr;
-            bpf_probe_read(&event->remote_addr_v6, 
sizeof(event->remote_addr_v6), &daddr->sin6_addr.s6_addr);
-            bpf_probe_read(&port, sizeof(port), &daddr->sin6_port);
-            event->remote_port = bpf_ntohs(port);
-        } else {
-            event->remote_port = 0;
+    event->need_complete_addr = need_complete_addr;
+    event->local_addr_v4 = con.local_addr_v4;
+    __builtin_memcpy(&event->local_addr_v6, &con.local_addr_v4, 
16*sizeof(__u8));
+    event->local_port = con.local_port;
+    event->remote_addr_v4 = con.remote_addr_v4;
+    __builtin_memcpy(&event->remote_addr_v6, &con.remote_addr_v6, 
16*sizeof(__u8));
+    event->remote_port = con.remote_port;
+
+    __u32 ret = bpf_perf_event_output(ctx, &socket_connection_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    // if not send event success, then update to the event not sent
+    if (ret < 0) {
+        struct active_connection_t *con = 
bpf_map_lookup_elem(&active_connection_map, &conid);
+        if (con != NULL) {
+            con->connect_event_send = false;
+            bpf_map_update_elem(&active_connection_map, &conid, con, 0);
         }
-    } else {
-        event->need_complete_addr = 1;
-        // clean the cache in LRU(remote port is enough)
-        event->remote_port = 0;
     }
-
-    bpf_perf_event_output(ctx, &socket_connection_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
 }
 
 static __inline void notify_close_connection(struct pt_regs* ctx, __u64 conid, 
struct active_connection_t* con, __u64 start_time, __u64 end_time) {
-    // only trace ipv4, v6, or unknown
-    if (family_should_trace(con->socket_family) == false) {
-        return;
-    }
-    // pid is contains
-    if (tgid_should_trace(con->pid) == false) {
-        return;
+    // if the connect event not send, then check the pid or socket family
+    if (con->connect_event_send == false) {
+        // only trace ipv4, v6, or unknown
+        if (family_should_trace(con->socket_family) == false) {
+            return;
+        }
+        // ignore send close event if current process should not trace
+        if (tgid_should_trace(con->pid) == false) {
+            return;
+        }
     }
 
     __u64 exe_time = (__u64)(end_time - start_time);
@@ -196,6 +195,14 @@ static __inline void notify_close_connection(struct 
pt_regs* ctx, __u64 conid, s
     close_event.sockfd = con->sockfd;
     close_event.role = con->role;
 
+    close_event.socket_family = con->socket_family;
+    close_event.local_addr_v4 = con->local_addr_v4;
+    __builtin_memcpy(&close_event.local_addr_v6, &con->local_addr_v4, 
16*sizeof(__u8));
+    close_event.local_port = con->local_port;
+    close_event.remote_addr_v4 = con->remote_addr_v4;
+    __builtin_memcpy(&close_event.remote_addr_v6, &con->remote_addr_v6, 
16*sizeof(__u8));
+    close_event.remote_port = con->remote_port;
+
     close_event.write_bytes = con->write_bytes;
     close_event.write_count = con->write_count;
     close_event.write_exe_time = con->write_exe_time;
@@ -206,7 +213,6 @@ static __inline void notify_close_connection(struct 
pt_regs* ctx, __u64 conid, s
     close_event.write_rtt_time = con->write_rtt_time;
 
     bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU, 
&close_event, sizeof(close_event));
-//    bpf_printk("submit new close: conid: %lld, write bytes: %lld, write 
count: %lld\n", conid, con->write_bytes, con->write_count);
 }
 
 static __inline void submit_close_connection(struct pt_regs* ctx, __u32 tgid, 
__u32 fd, __u64 start_nacs) {
@@ -214,7 +220,6 @@ static __inline void submit_close_connection(struct 
pt_regs* ctx, __u32 tgid, __
     __u64 conid = gen_tgid_fd(tgid, fd);
     struct active_connection_t* con = 
bpf_map_lookup_elem(&active_connection_map, &conid);
     if (con == NULL) {
-//        bpf_printk("connection id not exists: tgid: %d, fd: %d -> %lld", 
tgid, fd, conid);
         return;
     }
     notify_close_connection(ctx, conid, con, start_nacs, curr_nacs);
@@ -247,7 +252,28 @@ static __inline void 
submit_connection_when_not_exists(struct pt_regs *ctx, __u6
     submit_new_connection(ctx, func_name, tgid, connect_args->fd, 
connect_args->start_nacs, connect_args->addr, NULL);
 }
 
-static __always_inline void process_write_data(void *ctx, __u64 id, struct 
sock_data_args_t *args, ssize_t bytes_count,
+static __always_inline void resent_connect_event(struct pt_regs *ctx, __u32 
tgid, __u32 fd, __u64 conid, struct active_connection_t *con) {
+    struct socket_connect_event_t *event = create_socket_connect_event();
+    if (!event) {
+        return;
+    }
+    event->conid = conid;
+    event->random_id = con->random_id;
+    event->func_name = SOCKET_OPTS_TYPE_RESENT;
+    event->pid = tgid;
+    event->sockfd = fd;
+    event->role = con->role;
+    event->socket_family = con->socket_family;
+    event->need_complete_addr = 1;
+    event->remote_port = 0;
+    __u32 ret = bpf_perf_event_output(ctx, &socket_connection_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    if (ret >= 0) {
+        con->connect_event_send = true;
+        bpf_map_update_elem(&active_connection_map, &conid, con, 0);
+    }
+}
+
+static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, 
struct sock_data_args_t *args, ssize_t bytes_count,
                                         __u32 data_direction, const bool vecs, 
__u32 func_name) {
     __u64 curr_nacs = bpf_ktime_get_ns();
     __u32 tgid = (__u32)(id >> 32);
@@ -273,26 +299,35 @@ static __always_inline void process_write_data(void *ctx, 
__u64 id, struct sock_
         return;
     }
 
-    // pid is contains
-    if (tgid_should_trace(tgid) == false) {
-        return;
+    // if connect event is not sent
+    if (conn->connect_event_send == false) {
+        // if the connection should trace, double check
+        if (tgid_should_trace(tgid) == false) {
+            return;
+        }
+        // resent the connection event
+        resent_connect_event(ctx, tgid, args->fd, conid, conn);
     }
 
     // unknown connection role, then try to use procotol analyzer to analyze 
request or response
     if (conn->role == CONNECTION_ROLE_TYPE_UNKNOWN) {
-//        bpf_printk("connection role is unknown, buf exists: %d, ioves 
exists: %d, func: %d\n", args->buf != NULL ? 1 : 0, args->iovec != NULL ? 1 : 
0, func_name);
         struct socket_buffer_reader_t *buf_reader = NULL;
         if (args->buf != NULL) {
             buf_reader = read_socket_data(args->buf, bytes_count);
         } else if (args->iovec != NULL) {
-            struct iovec *iovec = _(args->iovec);
-            char *buff = _(iovec->iov_base);
-            buf_reader = read_socket_data(buff, bytes_count);
+            struct iovec iov;
+            int err = bpf_probe_read(&iov, sizeof(iov), args->iovec);
+            if (err >= 0) {
+                __u64 size = iov.iov_len;
+                if (size > bytes_count) {
+                    size = bytes_count;
+                }
+                buf_reader = read_socket_data((char *)iov.iov_base, size);
+            }
         }
 
         if (buf_reader != NULL) {
             enum message_type_t msg_type = 
analyze_protocol(buf_reader->buffer, buf_reader->data_len, conn);
-//            bpf_printk("connection: %lld, message type: %d\n", conid, 
msg_type);
             // if send request data to remote address or receive response data 
from remote address
             // then, recognized current connection is client
             if ((msg_type == kRequest && data_direction == 
SOCK_DATA_DIRECTION_EGRESS) ||
@@ -301,8 +336,8 @@ static __always_inline void process_write_data(void *ctx, 
__u64 id, struct sock_
 
             // if send response data to remote address or receive request data 
from remote address
             // then, recognized current connection is server
-            } else if ((msg_type == kResponse && data_direction == 
SOCK_DATA_DIRECTION_INGRESS) ||
-                       (msg_type == kRequest && data_direction == 
SOCK_DATA_DIRECTION_EGRESS)) {
+            } else if ((msg_type == kResponse && data_direction == 
SOCK_DATA_DIRECTION_EGRESS) ||
+                       (msg_type == kRequest && data_direction == 
SOCK_DATA_DIRECTION_INGRESS)) {
                 conn->role = CONNECTION_ROLE_TYPE_SERVER;
             }
         }
@@ -650,8 +685,11 @@ int sys_sendmmsg(struct pt_regs* ctx) {
 
     struct sock_data_args_t data_args = {};
     data_args.fd = fd;
-    data_args.mmsg = mmsghdr;
-    data_args.iovec = _(mmsghdr->msg_hdr.msg_iov);
+    struct iovec *msg_iov = _(mmsghdr->msg_hdr.msg_iov);
+    data_args.iovec = msg_iov;
+    size_t msg_iovlen = _(mmsghdr->msg_hdr.msg_iovlen);
+    data_args.iovlen = msg_iovlen;
+    data_args.msg_len = &mmsghdr->msg_hdr.msg_iovlen;
     data_args.start_nacs = bpf_ktime_get_ns();
     bpf_map_update_elem(&socket_data_args, &id, &data_args, 0);
     return 0;
@@ -671,9 +709,8 @@ int sys_sendmmsg_ret(struct pt_regs* ctx) {
     // socket data
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
-        struct mmsghdr *mmsg = data_args->mmsg;
-        __u32 bytes_count = _(mmsg->msg_len);
-        data_args->iovlen = _(mmsg->msg_hdr.msg_iovlen);
+        __u32 bytes_count;
+        BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
         process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
@@ -937,8 +974,11 @@ int sys_recvmmsg(struct pt_regs* ctx) {
 
     struct sock_data_args_t data_args = {};
     data_args.fd = fd;
-    data_args.mmsg = mmsghdr;
-    data_args.iovec = _(mmsghdr->msg_hdr.msg_iov);
+    struct iovec *msg_iov = _(mmsghdr->msg_hdr.msg_iov);
+    data_args.iovec = msg_iov;
+    size_t msg_iovlen = _(mmsghdr->msg_hdr.msg_iovlen);
+    data_args.iovlen = msg_iovlen;
+    data_args.msg_len = &mmsghdr->msg_hdr.msg_iovlen;
     data_args.start_nacs = bpf_ktime_get_ns();
     bpf_map_update_elem(&socket_data_args, &id, &data_args, 0);
     return 0;
@@ -958,9 +998,8 @@ int sys_recvmmsg_ret(struct pt_regs* ctx) {
     // socket data
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
-        struct mmsghdr *mmsg = data_args->mmsg;
-        __u32 bytes_count = _(mmsg->msg_len);
-        data_args->iovlen = _(mmsg->msg_hdr.msg_iovlen);
+        __u32 bytes_count;
+        BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
         process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
diff --git a/bpf/profiling/network/protocol_analyze.h 
b/bpf/profiling/network/protocol_analyze.h
index f233c96..16f122c 100644
--- a/bpf/profiling/network/protocol_analyze.h
+++ b/bpf/profiling/network/protocol_analyze.h
@@ -45,17 +45,17 @@ struct protocol_message_t {
   __u32 type;
 };
 
-#define BPF_PROBE_READ_VAR(value, ptr) bpf_probe_read(&value, sizeof(value), 
ptr)
+#define BPF_PROBE_READ_VAR1(value, ptr) bpf_probe_read(&value, sizeof(value), 
ptr)
 
 static __inline int32_t read_big_endian_int32(const char* buf) {
   int32_t length;
-  BPF_PROBE_READ_VAR(length, buf);
+  BPF_PROBE_READ_VAR1(length, buf);
   return bpf_ntohl(length);
 }
 
 static __inline int32_t read_big_endian_int16(const char* buf) {
   int16_t val;
-  BPF_PROBE_READ_VAR(val, buf);
+  BPF_PROBE_READ_VAR1(val, buf);
   return bpf_ntohl(val);
 }
 
@@ -299,9 +299,12 @@ if (len < kMinPayloadLen || len > kMaxPayloadLen) {
 return kUnknown;
 }
 // If the input includes a whole message (1 byte tag + length), check the last 
character.
-//if ((len + 1 <= (int)count) && (buf[count-1] != '\0')) {
-//return kUnknown;
-//}
+if (count > MAX_SOCKET_BUFFER_READ_LENGTH) {
+    count = MAX_SOCKET_BUFFER_READ_LENGTH;
+}
+if ((len + 1 <= (int)count) && (buf[count-1] != '\0')) {
+return kUnknown;
+}
 return kRequest;
 }
 
@@ -346,7 +349,7 @@ static const uint8_t kComStmtExecute = 0x17;
 static const uint8_t kComStmtClose = 0x19;
 
 // Second statement checks whether suspected header matches the length of 
current packet.
-bool use_prev_buf = (conn_info->prev_count == 4) && 
(*((uint32_t*)conn_info->prev_buf) == count);
+bool use_prev_buf = (conn_info->prev_count == 4) && 
((size_t)read_big_endian_int32(conn_info->prev_buf) == count);
 
 if (use_prev_buf) {
 // Check the header_state to find out if the header has been read. MySQL 
server tends to
@@ -588,6 +591,9 @@ default:
   return kUnknown;
 }
 if (mux_type == kRerr || mux_type == kRerrOld) {
+if (length > MAX_SOCKET_BUFFER_READ_LENGTH) {
+    length = MAX_SOCKET_BUFFER_READ_LENGTH;
+}
 if (buf[length - 5] != 'c' || buf[length - 4] != 'h' || buf[length - 3] != 'e' 
||
     buf[length - 2] != 'c' || buf[length - 1] != 'k')
   return kUnknown;
@@ -619,6 +625,9 @@ static __inline enum message_type_t 
infer_nats_message(const char* buf, size_t c
 if (count < 3) {
 return kUnknown;
 }
+if (count > MAX_SOCKET_BUFFER_READ_LENGTH) {
+    count = MAX_SOCKET_BUFFER_READ_LENGTH;
+}
 // The last two chars are \r\n, the terminal sequence of all NATS messages.
 if (buf[count - 2] != '\r') {
 return kUnknown;
@@ -673,8 +682,8 @@ static __inline enum message_type_t analyze_protocol(char 
*buf, __u32 count, str
         inferred_message.protocol = kProtocolCQL;
     } else if ((inferred_message.type = infer_mongo_message(buf, count)) != 
kUnknown) {
         inferred_message.protocol = kProtocolMongo;
-    } else if ((inferred_message.type = infer_pgsql_message(buf, count)) != 
kUnknown) {
-        inferred_message.protocol = kProtocolPGSQL;
+//    } else if ((inferred_message.type = infer_pgsql_message(buf, count)) != 
kUnknown) {
+//        inferred_message.protocol = kProtocolPGSQL;
     } else if ((inferred_message.type = infer_mysql_message(buf, count, 
conn_info)) != kUnknown) {
         inferred_message.protocol = kProtocolMySQL;
 //    } else if ((inferred_message.type = infer_mux_message(buf, count)) != 
kUnknown) {
diff --git a/bpf/profiling/network/sock_stats.h 
b/bpf/profiling/network/sock_stats.h
index 8c1156e..a597116 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -33,6 +33,15 @@ struct active_connection_t {
     // socket type
     __u32 socket_family;
 
+    // remote address
+    __u32 remote_addr_v4;
+    __u8 remote_addr_v6[16];
+    __u32 remote_port;
+    // local address
+    __u32 local_addr_v4;
+    __u8 local_addr_v6[16];
+    __u16 local_port;
+
     // basic stats(bytes, avg(exe_time/count))
     __u64 write_bytes;
     __u64 write_count;
@@ -50,6 +59,9 @@ struct active_connection_t {
     __u64 prev_count;
     char prev_buf[4];
     __u32 prepend_length_header;
+
+    // connect event already send
+    __u32 connect_event_send;
 };
 struct {
        __uint(type, BPF_MAP_TYPE_HASH);
@@ -126,6 +138,18 @@ struct socket_close_event_t {
     __u32 role;
     __u32 fix;
 
+    // socket type
+    __u32 socket_family;
+    // upstream
+    __u32 remote_addr_v4;
+    __u8 remote_addr_v6[16];
+    __u32 remote_port;
+    // downstream
+    __u32 local_addr_v4;
+    __u8 local_addr_v6[16];
+    __u16 local_port;
+    __u32 fix1;
+
     // basic stats(bytes, avg(exe_time/count))
     __u64 write_bytes;
     __u64 write_count;
diff --git a/go.mod b/go.mod
index 03653a4..5c74485 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
        github.com/hashicorp/go-multierror v1.1.1
        github.com/hashicorp/golang-lru v0.5.4
        github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639
+       github.com/orcaman/concurrent-map v1.0.0
        github.com/shirou/gopsutil v3.21.11+incompatible
        github.com/sirupsen/logrus v1.8.1
        github.com/spf13/cobra v1.3.0
diff --git a/go.sum b/go.sum
index 6e64dd0..0d475d3 100644
--- a/go.sum
+++ b/go.sum
@@ -356,6 +356,8 @@ github.com/onsi/gomega 
v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
 github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
 github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/orcaman/concurrent-map v1.0.0 
h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
+github.com/orcaman/concurrent-map v1.0.0/go.mod 
h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod 
h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pascaldekloe/goe v0.1.0/go.mod 
h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.9.4 
h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
diff --git a/pkg/process/api/process.go b/pkg/process/api/process.go
index 53f98ba..bde8bda 100644
--- a/pkg/process/api/process.go
+++ b/pkg/process/api/process.go
@@ -54,6 +54,11 @@ type ProcessInterface interface {
        ProfilingStat() *profiling.Info
        // ExeName get execute file name
        ExeName() (string, error)
+
+       // PortIsExpose check the port is exposed
+       PortIsExpose(port int) bool
+       // DetectNewExposePort add a new detected expose port
+       DetectNewExposePort(port int)
 }
 
 // ProcessEntity is related to backend entity concept
diff --git a/pkg/process/finders/base/process.go 
b/pkg/process/finders/base/process.go
index 49246fe..fc3b46f 100644
--- a/pkg/process/finders/base/process.go
+++ b/pkg/process/finders/base/process.go
@@ -36,4 +36,6 @@ type DetectedProcess interface {
        DetectType() api.ProcessDetectType
        // ProfilingStat of process
        ProfilingStat() *profiling.Info
+       // ExposePorts define which ports are exposed
+       ExposePorts() []int
 }
diff --git a/pkg/process/finders/base/template.go 
b/pkg/process/finders/base/template.go
index aa4b7da..2aee7c4 100644
--- a/pkg/process/finders/base/template.go
+++ b/pkg/process/finders/base/template.go
@@ -123,3 +123,15 @@ func (p *TemplateProcess) Pid() int32 {
 func (p *TemplateProcess) WorkDir() (string, error) {
        return p.Cwd()
 }
+
+// ExeNameInCommandLine means the executed file name in the command line string
+func (p *TemplateProcess) ExeNameInCommandLine() (string, error) {
+       cmdline, err := p.CmdlineSlice()
+       if err != nil {
+               return "", err
+       }
+       if len(cmdline) == 0 {
+               return "", fmt.Errorf("cannot found the command line")
+       }
+       return cmdline[0], nil
+}
diff --git a/pkg/process/finders/context.go b/pkg/process/finders/context.go
index 8139985..d544e17 100644
--- a/pkg/process/finders/context.go
+++ b/pkg/process/finders/context.go
@@ -47,7 +47,8 @@ type ProcessContext struct {
        detectType    api.ProcessDetectType
 
        // cache
-       exeName string
+       exeName      string
+       exposedPorts map[int]bool
 }
 
 func (p *ProcessContext) ID() string {
@@ -80,3 +81,11 @@ func (p *ProcessContext) ExeName() (string, error) {
        }
        return p.exeName, nil
 }
+
+func (p *ProcessContext) PortIsExpose(port int) bool {
+       return p.exposedPorts[port]
+}
+
+func (p *ProcessContext) DetectNewExposePort(port int) {
+       p.exposedPorts[port] = true
+}
diff --git a/pkg/process/finders/kubernetes/process.go 
b/pkg/process/finders/kubernetes/process.go
index 547e2ae..45b2511 100644
--- a/pkg/process/finders/kubernetes/process.go
+++ b/pkg/process/finders/kubernetes/process.go
@@ -69,3 +69,25 @@ func (p *Process) DetectType() api.ProcessDetectType {
 func (p *Process) ProfilingStat() *profiling.Info {
        return p.profiling
 }
+
+func (p *Process) ExposePorts() []int {
+       result := make([]int, 0)
+       for _, cp := range p.podContainer.ContainerSpec.Ports {
+               result = append(result, int(cp.ContainerPort))
+               if cp.HostPort > 0 {
+                       result = append(result, int(cp.HostPort))
+               }
+       }
+       connections, err := p.original.Connections()
+       if err != nil {
+               log.Warnf("query the process connection error: pid: %d, error: 
%v", p.pid, err)
+               return result
+       }
+       for _, c := range connections {
+               if c.Status == "LISTEN" {
+                       result = append(result, int(c.Laddr.Port))
+               }
+       }
+
+       return result
+}
diff --git a/pkg/process/finders/scanner/process.go 
b/pkg/process/finders/scanner/process.go
index d13d77d..ae5ebed 100644
--- a/pkg/process/finders/scanner/process.go
+++ b/pkg/process/finders/scanner/process.go
@@ -109,6 +109,20 @@ func (p *Process) BuildIdentity() string {
                p.entity.InstanceName, p.entity.ProcessName)
 }
 
+func (p *Process) ExposePorts() []int {
+       connections, err := p.original.Connections()
+       if err != nil {
+               log.Warnf("error getting the process connections, pid: %d, 
error: %v", p.pid, err)
+       }
+       ports := make([]int, 0)
+       for _, con := range connections {
+               if con.Status == "LISTEN" {
+                       ports = append(ports, int(con.Laddr.Port))
+               }
+       }
+       return ports
+}
+
 func requiredNotNull(err error, key, value string) error {
        if err != nil {
                return err
diff --git a/pkg/process/finders/storage.go b/pkg/process/finders/storage.go
index 1c10e7f..13e28a1 100644
--- a/pkg/process/finders/storage.go
+++ b/pkg/process/finders/storage.go
@@ -282,10 +282,15 @@ func (s *ProcessStorage) SyncAllProcessInFinder(finder 
api.ProcessDetectType, pr
 }
 
 func (s *ProcessStorage) constructNewProcessContext(finder 
api.ProcessDetectType, process base.DetectedProcess) *ProcessContext {
+       exporsedPorts := make(map[int]bool)
+       for _, p := range process.ExposePorts() {
+               exporsedPorts[p] = true
+       }
        return &ProcessContext{
                syncStatus:    NotReport,
                detectProcess: process,
                detectType:    finder,
+               exposedPorts:  exporsedPorts,
        }
 }
 
diff --git a/pkg/profiling/task/network/analyzer.go 
b/pkg/profiling/task/network/analyzer.go
index 7ce28de..e92a574 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyzer.go
@@ -19,9 +19,17 @@ package network
 
 import (
        "github.com/apache/skywalking-rover/pkg/process/api"
+       "github.com/apache/skywalking-rover/pkg/tools"
+)
+
+const (
+       layerMeshDP  = "MESH_DP"
+       layerMeshApp = "MESH"
+       processEnvoy = "envoy"
 )
 
 type TrafficAnalyzer struct {
+       existingProcesses map[int32][]api.ProcessInterface
        // used to find local same with remote address
        // the connect request(local:a -> remote:b) same with accept 
address(remote:a -> local:b)
        // key: localIP:port+RemoteIP+port
@@ -31,8 +39,8 @@ type TrafficAnalyzer struct {
        // used to find only have the remote address connection
        // the connect request(local:unknown -> remote:b), server side 
accept(local:b)
        // key: RemoteIP:port
-       // value: remotePid
-       peerAddressCache map[PeerAddress]uint32
+       // value: remotePid list
+       peerAddressCache map[PeerAddress][]uint32
 
        // used to find the envoy client, service connect to the 
service(outbound), but envoy accept the request through iptables
        // the connect request(local:a -> remote:b(upstream service)), envoy 
side accept(local:c -> remote:a)
@@ -44,14 +52,21 @@ type TrafficAnalyzer struct {
        // key: pid
        // value: process entities
        processData map[uint32][]api.ProcessInterface
+
+       // all local addresses(host only)
+       // key: ip
+       // value: [entity layer]process
+       localAddresses map[string]map[string]api.ProcessInterface
 }
 
-func NewTrafficAnalyzer() *TrafficAnalyzer {
+func NewTrafficAnalyzer(processes map[int32][]api.ProcessInterface) 
*TrafficAnalyzer {
        return &TrafficAnalyzer{
+               existingProcesses:             processes,
                localWithPeerCache:            
make(map[LocalWithPeerAddress]*PidWithRole),
-               peerAddressCache:              make(map[PeerAddress]uint32),
+               peerAddressCache:              make(map[PeerAddress][]uint32),
                envoyAcceptClientAddressCache: 
make(map[PeerAddress]*AddressWithPid),
                processData:                   
make(map[uint32][]api.ProcessInterface),
+               localAddresses:                
make(map[string]map[string]api.ProcessInterface),
        }
 }
 
@@ -86,6 +101,7 @@ func (t *TrafficAnalyzer) 
CombineConnectionToTraffics(connections []*ConnectionC
                        continue
                }
 
+               t.tryingToGenerateTheRoleWhenRemotePidCannotFound(con)
                var pidToRemoteKey PidToRemoteTrafficKey
                pidToRemoteKey.LocalPid = con.LocalPid
                pidToRemoteKey.RemoteIP = con.RemoteIP
@@ -122,9 +138,33 @@ func (t *TrafficAnalyzer) 
CombineConnectionToTraffics(connections []*ConnectionC
        return result
 }
 
+func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con 
*ConnectionContext) {
+       if con.Role != ConnectionRoleUnknown {
+               return
+       }
+       // local process address or process could not found, then could analyze 
the role
+       if con.LocalPort == 0 || len(con.LocalProcesses) == 0 {
+               return
+       }
+       var role ConnectionRole
+       // if port is expose, and remote address is not local pid
+       // then the role of connection is server side usually
+       if con.LocalProcesses[0].PortIsExpose(int(con.LocalPort)) {
+               role = ConnectionRoleServer
+       } else {
+               role = ConnectionRoleClient
+       }
+
+       con.Role = role
+       log.Debugf("found current connection role is unknown, analyzed role is 
%s through local port. %s:%d(%d)->%s:%d",
+               role.String(), con.LocalIP, con.LocalPort, con.LocalPid, 
con.RemoteIP, con.RemotePort)
+}
+
 func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, 
con *ConnectionContext, remotePid uint32) *ProcessTraffic {
        if traffic == nil {
                traffic = &ProcessTraffic{
+                       analyzer: t,
+
                        LocalPid:       con.LocalPid,
                        LocalProcesses: con.LocalProcesses,
                        LocalIP:        con.LocalIP,
@@ -181,6 +221,10 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic 
*ProcessTraffic, con
        return traffic
 }
 
+func (t *TrafficAnalyzer) IsLocalAddressInCache(ip string) bool {
+       return len(t.localAddresses[ip]) > 0
+}
+
 func (t *TrafficAnalyzer) buildCache(connections []*ConnectionContext) {
        for _, con := range connections {
                if t.ipNotEmpty(con.LocalIP, con.LocalPort) && 
t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
@@ -193,14 +237,32 @@ func (t *TrafficAnalyzer) buildCache(connections 
[]*ConnectionContext) {
                                Pid:  con.LocalPid,
                                Role: con.Role,
                        }
+               }
+               if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
+                       peerAddress := PeerAddress{
+                               RemoteIP:   con.LocalIP,
+                               RemotePort: con.LocalPort,
+                       }
+                       t.peerAddressCache[peerAddress] = 
append(t.peerAddressCache[peerAddress], con.LocalPid)
 
+                       if len(con.LocalProcesses) > 0 {
+                               localAddressProcesses := 
t.localAddresses[con.LocalIP]
+                               if len(localAddressProcesses) == 0 {
+                                       localAddressProcesses = 
make(map[string]api.ProcessInterface)
+                                       t.localAddresses[con.LocalIP] = 
localAddressProcesses
+                               }
+                               for _, p := range con.LocalProcesses {
+                                       localAddressProcesses[p.Entity().Layer] 
= p
+                               }
+                       }
+               } else if t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
                        // if server side is envoy
                        if con.Role == ConnectionRoleServer && 
len(con.LocalProcesses) > 0 {
                                name, err := con.LocalProcesses[0].ExeName()
                                if err != nil {
                                        log.Warnf("get process exe name 
failure, pid: %d, error: %v", con.LocalPid, err)
                                }
-                               if name == "envoy" {
+                               if name == processEnvoy {
                                        
t.envoyAcceptClientAddressCache[PeerAddress{
                                                RemoteIP:   con.RemoteIP,
                                                RemotePort: con.RemotePort,
@@ -212,27 +274,45 @@ func (t *TrafficAnalyzer) buildCache(connections 
[]*ConnectionContext) {
                                }
                        }
                }
-               if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
-                       t.peerAddressCache[PeerAddress{
-                               RemoteIP:   con.LocalIP,
-                               RemotePort: con.LocalPort,
-                       }] = con.LocalPid
-               }
 
                if len(t.processData[con.LocalPid]) == 0 {
                        t.processData[con.LocalPid] = con.LocalProcesses
                }
+               t.processExportPortAnalyze(con)
+       }
+}
+
+func (t *TrafficAnalyzer) processExportPortAnalyze(con *ConnectionContext) {
+       // if the process exists, role of connection is server mode and local 
port is exists1
+       // add the detected port into the processes
+       if len(con.LocalProcesses) > 0 && con.Role == ConnectionRoleServer && 
con.LocalPort > 0 {
+               for _, p := range con.LocalProcesses {
+                       p.DetectNewExposePort(int(con.LocalPort))
+               }
        }
 }
 
 func (t *TrafficAnalyzer) findRemotePid(con *ConnectionContext) uint32 {
-       if !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
-               return 0
+       // full address
+       if pid := t.findRemotePidWhenContainsFullAddress(con); pid > 0 {
+               return pid
        }
 
-       // try to find the target pid first
+       // only remote address
+       if pid := t.findRemotePidWhenContainsRemoteAddress(con); pid > 0 {
+               return pid
+       }
+
+       // mesh environment
+       if pid := t.findRemotePidWhenMeshEnvironment(con); pid > 0 {
+               return pid
+       }
+       return 0
+}
+
+func (t *TrafficAnalyzer) findRemotePidWhenContainsFullAddress(con 
*ConnectionContext) uint32 {
        // match to localWithPeerCache
-       if t.ipNotEmpty(con.LocalIP, con.LocalPort) {
+       if t.ipNotEmpty(con.LocalIP, con.LocalPort) && 
t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
                data := t.localWithPeerCache[LocalWithPeerAddress{
                        LocalIP:    con.RemoteIP,
                        LocalPort:  con.RemotePort,
@@ -258,8 +338,10 @@ func (t *TrafficAnalyzer) findRemotePid(con 
*ConnectionContext) uint32 {
                                RemotePort: con.LocalPort,
                        }]
                        if addr != nil {
-                               con.RemoteIP = addr.RemoteIP
-                               con.RemotePort = addr.RemotePort
+                               if t.ipNotEmpty(addr.RemoteIP, addr.RemotePort) 
{
+                                       con.RemoteIP = addr.RemoteIP
+                                       con.RemotePort = addr.RemotePort
+                               }
                                log.Debugf("found envoy connection: 
%s:%d->%s:%d", con.LocalIP, con.LocalPort, con.RemoteIP, con.RemotePort)
                                return addr.Pid
                        }
@@ -267,24 +349,79 @@ func (t *TrafficAnalyzer) findRemotePid(con 
*ConnectionContext) uint32 {
                }
        }
 
+       return 0
+}
+
+func (t *TrafficAnalyzer) findRemotePidWhenContainsRemoteAddress(con 
*ConnectionContext) uint32 {
+       if !t.ipNotEmpty(con.RemoteIP, con.RemotePort) {
+               return 0
+       }
        // use non-strict verification, don't verify the client role, ensure 
that pid is available to be greatest extent
        // because the information of role maybe missing when not trigger the 
connect/accept
-       pid := t.peerAddressCache[PeerAddress{
+       pidCaches := t.peerAddressCache[PeerAddress{
                RemoteIP:   con.RemoteIP,
                RemotePort: con.RemotePort,
        }]
-       if pid != 0 {
-               return pid
+       if len(pidCaches) > 0 {
+               result := pidCaches[0]
+               // when the remote peer address contains multiple pid
+               // the process usually not self
+               for _, pid := range pidCaches {
+                       if pid != con.LocalPid {
+                               result = pid
+                       }
+               }
+               log.Debugf("found remote address by peer address cache: %s:%d 
-> %d", con.RemoteIP, con.RemotePort, result)
+               return result
+       }
+       return 0
+}
+
+func (t *TrafficAnalyzer) findRemotePidWhenMeshEnvironment(con 
*ConnectionContext) uint32 {
+       // special handle for mesh application, when it could not match the 
process through address
+       if len(con.LocalProcesses) == 0 || !t.ipNotEmpty(con.RemoteIP, 
con.RemotePort) {
+               return 0
+       }
+       for _, localProcess := range con.LocalProcesses {
+               // match when the MESH data plane not found the MESH application
+               if localProcess.Entity().Layer == layerMeshDP {
+                       addresses := t.localAddresses[con.RemoteIP]
+                       if len(addresses) == 0 {
+                               continue
+                       }
+                       if p := addresses[layerMeshApp]; p != nil {
+                               log.Debugf("found in the mesh application, 
remote ip: %s", con.RemoteIP)
+                               return uint32(p.Pid())
+                       }
+                       continue
+               }
+               // if current is mesh application, and remote address is not 
local and dns, them it's must be sent to the MESH_DP
+               if localProcess.Entity().Layer == layerMeshApp && 
con.RemotePort != 53 &&
+                       len(t.localAddresses[con.RemoteIP]) == 0 && 
!tools.IsLocalHostAddress(con.RemoteIP) {
+                       if envoyPid := 
t.findSameInstanceMeshDP(localProcess.Entity()); envoyPid != 0 {
+                               log.Debugf("found in the mesh data plane, 
remote ip: %s", con.RemoteIP)
+                               return envoyPid
+                       }
+               }
        }
-       // trying to parse the address with revert the network port
-       // pid = t.peerAddressCache[PeerAddress{
-       //      RemoteIP:   con.RemoteIP,
-       //      RemotePort: parsePort(con.RemotePort),
-       // }]
-       // if pid != 0 {
-       //      return pid
-       //}
+       return 0
+}
 
+func (t *TrafficAnalyzer) findSameInstanceMeshDP(entity *api.ProcessEntity) 
uint32 {
+       for _, psList := range t.existingProcesses {
+               for _, p := range psList {
+                       if p.Entity().Layer == layerMeshDP && 
p.Entity().ServiceName == entity.ServiceName && p.Entity().InstanceName == 
entity.InstanceName {
+                               name, err := p.ExeName()
+                               if err != nil {
+                                       log.Warnf("query the process execute 
file name failure: %d, error: %v", p.Pid(), err)
+                                       continue
+                               }
+                               if name == processEnvoy {
+                                       return uint32(p.Pid())
+                               }
+                       }
+               }
+       }
        return 0
 }
 
diff --git a/pkg/profiling/task/network/context.go 
b/pkg/profiling/task/network/context.go
index c918e46..451496f 100644
--- a/pkg/profiling/task/network/context.go
+++ b/pkg/profiling/task/network/context.go
@@ -18,6 +18,7 @@
 package network
 
 import (
+       "context"
        "encoding/json"
        "errors"
        "fmt"
@@ -25,6 +26,8 @@ import (
        "sync"
        "unsafe"
 
+       cmap "github.com/orcaman/concurrent-map"
+
        "github.com/sirupsen/logrus"
 
        "github.com/hashicorp/go-multierror"
@@ -42,12 +45,10 @@ type Context struct {
        bpf *bpfObjects // current bpf programs
 
        // standard syscall connections
-       activeConnections map[string]*ConnectionContext // current 
activeConnections connections
-       closedConnections []*ConnectionContext          // closed connections'
-       flushClosedEvents chan *SocketCloseEventWrapper // connection have been 
closed, it is a queue to cache unknown active connections
-       connectionLock    sync.Mutex                    // make sure read write 
closedConnections is synchronized
-       // if the socket close event not handled when flushing, then cache to 
this array to prevent dead-lock with flushClosedEvents
-       secondCloseEventCache []*SocketCloseEventWrapper
+       activeConnections cmap.ConcurrentMap      // current activeConnections 
connections
+       closedConnections []*ConnectionContext    // closed connections'
+       flushClosedEvents chan *SocketCloseEvent  // connection have been 
closed, it is a queue to cache unknown active connections
+       sockParseQueue    chan *ConnectionContext // socket address parse queue
 
        // socket retransmit/drop
        socketExceptionStatics       map[SocketBasicKey]*SocketExceptionValue
@@ -114,10 +115,10 @@ type ConnectionContext struct {
 
 func NewContext() *Context {
        return &Context{
-               activeConnections:      make(map[string]*ConnectionContext),
+               activeConnections:      cmap.New(),
                closedConnections:      make([]*ConnectionContext, 0),
-               flushClosedEvents:      make(chan *SocketCloseEventWrapper, 
5000),
-               secondCloseEventCache:  make([]*SocketCloseEventWrapper, 0),
+               flushClosedEvents:      make(chan *SocketCloseEvent, 5000),
+               sockParseQueue:         make(chan *ConnectionContext, 5000),
                processes:              make(map[int32][]api.ProcessInterface),
                socketExceptionStatics: 
make(map[SocketBasicKey]*SocketExceptionValue),
        }
@@ -162,6 +163,34 @@ func (c *Context) FlushAllConnection() 
([]*ConnectionContext, error) {
        return allContexts, nil
 }
 
+func (c *Context) StartSocketAddressParser(ctx context.Context) {
+       for i := 0; i < 2; i++ {
+               go c.handleSocketParseQueue(ctx)
+       }
+}
+
+func (c *Context) handleSocketParseQueue(ctx context.Context) {
+       for {
+               select {
+               case cc := <-c.sockParseQueue:
+                       socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
+                       if err != nil {
+                               // if the remote port of connection is empty, 
then this connection not available basically
+                               if cc.RemotePort == 0 {
+                                       log.Warnf("complete the socket error, 
pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
+                               }
+                               continue
+                       }
+                       cc.LocalIP = socket.SrcIP
+                       cc.LocalPort = socket.SrcPort
+                       cc.RemoteIP = socket.DestIP
+                       cc.RemotePort = socket.DestPort
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
+
 func (c *Context) combineExceptionToConnections(ccs []*ConnectionContext, exps 
map[SocketBasicKey]*SocketExceptionValue) {
        for key, value := range exps {
                var remotePort, localPort = uint16(key.RemotePort), 
uint16(key.LocalPort)
@@ -213,13 +242,10 @@ func (c *Context) cleanAndGetAllExceptionContexts() 
map[SocketBasicKey]*SocketEx
 }
 
 func (c *Context) getAllConnectionWithContext() []*ConnectionContext {
-       c.connectionLock.Lock()
-       defer c.connectionLock.Unlock()
-
        result := make([]*ConnectionContext, 0)
        result = append(result, c.closedConnections...)
-       for _, con := range c.activeConnections {
-               result = append(result, con)
+       for _, con := range c.activeConnections.Items() {
+               result = append(result, con.(*ConnectionContext))
        }
 
        c.closedConnections = make([]*ConnectionContext, 0)
@@ -233,6 +259,13 @@ type ActiveConnectionInBPF struct {
        Role         ConnectionRole
        SocketFamily uint32
 
+       RemoteAddrV4   uint32
+       RemoteAddrV6   [16]uint8
+       RemoteAddrPort uint32
+       LocalAddrV4    uint32
+       LocalAddrV6    [16]uint8
+       LocalAddrPort  uint32
+
        WriteBytes   uint64
        WriteCount   uint64
        WriteExeTime uint64
@@ -248,6 +281,9 @@ type ActiveConnectionInBPF struct {
        ProtocolPrevCount     uint64
        ProtocolPrevBuf       [4]byte
        ProtocolPrependHeader uint32
+
+       // the connect event is already sent
+       ConnectEventIsSent uint32
 }
 
 type HistogramDataKey struct {
@@ -289,7 +325,6 @@ func (c *Context) fillConnectionMetrics(ccs 
[]*ConnectionContext) {
                        }
 
                        // update the role
-                       cc.Role = activeConnection.Role
                        
cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, 
activeConnection.WriteCount, activeConnection.WriteExeTime)
                        
cc.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, 
activeConnection.ReadCount, activeConnection.ReadExeTime)
                        cc.WriteRTTCounter.UpdateToCurrent(0, 
activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
@@ -371,29 +406,10 @@ func (c *Context) handleSocketConnectEvent(data 
interface{}) {
        }
 
        // build active connection information
-       con := &ConnectionContext{
-               // metadata
-               ConnectionID:     event.ConID,
-               RandomID:         event.RandomID,
-               LocalPid:         event.Pid,
-               SocketFD:         event.FD,
-               LocalProcesses:   processes,
-               ConnectionClosed: false,
-
-               // metrics
-               WriteCounter:          NewSocketDataCounterWithHistory(),
-               ReadCounter:           NewSocketDataCounterWithHistory(),
-               WriteRTTCounter:       NewSocketDataCounterWithHistory(),
-               WriteRTTHistogram:     NewSocketDataHistogramWithHistory(),
-               WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(),
-               ReadExeTimeHistogram:  NewSocketDataHistogramWithHistory(),
-               ConnectExecuteTime:    event.ExeTime,
-       }
-
+       con := c.newConnectionContext(event.ConID, event.RandomID, event.Pid, 
event.FD, processes, false)
+       con.ConnectExecuteTime = event.ExeTime
        con.Role = event.Role
-       var trace string
        if event.NeedComplete == 0 {
-               trace = "0"
                con.RemotePort = uint16(event.RemoteAddrPort)
                con.LocalPort = uint16(event.LocalAddrPort)
                if event.SocketFamily == unix.AF_INET {
@@ -404,27 +420,16 @@ func (c *Context) handleSocketConnectEvent(data 
interface{}) {
                        con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
                }
        } else {
-               socket, err := ParseSocket(event.Pid, event.FD)
-               if err != nil {
-                       trace = "1-1"
-                       // if the remote address exists then setting it
-                       if event.RemoteAddrPort == 0 {
-                               log.Debugf("complete the socket error, pid: %d, 
fd: %d, error: %v", event.Pid, event.FD, err)
+               // if the remote address exists then setting it
+               if event.RemoteAddrPort != 0 {
+                       con.RemotePort = uint16(event.RemoteAddrPort)
+                       if event.SocketFamily == unix.AF_INET {
+                               con.RemoteIP = 
parseAddressV4(event.RemoteAddrV4)
                        } else {
-                               con.RemotePort = uint16(event.RemoteAddrPort)
-                               if event.SocketFamily == unix.AF_INET {
-                                       con.RemoteIP = 
parseAddressV4(event.RemoteAddrV4)
-                               } else {
-                                       con.RemoteIP = 
parseAddressV6(event.RemoteAddrV6)
-                               }
+                               con.RemoteIP = 
parseAddressV6(event.RemoteAddrV6)
                        }
-               } else {
-                       trace = "1-2"
-                       con.LocalIP = socket.SrcIP
-                       con.LocalPort = socket.SrcPort
-                       con.RemoteIP = socket.DestIP
-                       con.RemotePort = socket.DestPort
                }
+               c.sockParseQueue <- con
        }
 
        // add to the context
@@ -432,15 +437,13 @@ func (c *Context) handleSocketConnectEvent(data 
interface{}) {
 
        if log.Enable(logrus.DebugLevel) {
                marshal, _ := json.Marshal(event)
-               log.Debugf("found connect event(%s): role: %s, %s:%d:%d -> 
%s:%d, json: %s", trace, con.Role.String(),
+               log.Debugf("found connect event: role: %s, %s:%d:%d -> %s:%d, 
json: %s", con.Role.String(),
                        con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, 
con.RemotePort, string(marshal))
        }
 }
 
 func (c *Context) saveActiveConnection(con *ConnectionContext) {
-       c.connectionLock.Lock()
-       defer c.connectionLock.Unlock()
-       c.activeConnections[c.generateConnectionKey(con.ConnectionID, 
con.RandomID)] = con
+       c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, 
con.RandomID), con)
 }
 
 type SocketCloseEvent struct {
@@ -452,6 +455,15 @@ type SocketCloseEvent struct {
        Role     ConnectionRole
        Fix      uint32
 
+       SocketFamily   uint32
+       RemoteAddrV4   uint32
+       RemoteAddrV6   [16]uint8
+       RemoteAddrPort uint32
+       LocalAddrV4    uint32
+       LocalAddrV6    [16]uint8
+       LocalAddrPort  uint32
+       Fix1           uint32
+
        WriteBytes   uint64
        WriteCount   uint64
        WriteExeTime uint64
@@ -463,51 +475,68 @@ type SocketCloseEvent struct {
        WriteRTTExeTime uint64
 }
 
-type SocketCloseEventWrapper struct {
-       *SocketCloseEvent
-       NotExistsCount int
-}
-
 // batch to re-process all cached closed event
-// if the event re-processes once, then just ignore it
 func (c *Context) batchReProcessCachedCloseEvent() {
-       // handling the second close event cache first, if it's could not be 
handle, then ignored
-       if len(c.secondCloseEventCache) > 0 {
-               for _, wrapper := range c.secondCloseEventCache {
-                       event := wrapper.SocketCloseEvent
-                       if !c.socketClosedEvent0(event) {
-                               log.Warnf("found close connection event, but 
current connection is not in active cache: pid: %d, "+
-                                       "socket fd: %d", event.Pid, 
event.SocketFD)
+       for len(c.flushClosedEvents) > 0 {
+               event := <-c.flushClosedEvents
+               if !c.socketClosedEvent0(event) {
+                       // if cannot the found the active connection, then just 
create a new closed connection context
+                       processes := c.processes[int32(event.Pid)]
+                       if len(processes) == 0 {
+                               continue
                        }
+                       cc := c.newConnectionContext(event.ConID, 
event.RandomID, event.Pid, event.SocketFD, processes, true)
+                       if event.SocketFamily == unix.AF_INET {
+                               cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+                               cc.LocalIP = parseAddressV4(event.LocalAddrV4)
+                       } else if event.SocketFamily == unix.AF_INET6 {
+                               cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+                               cc.LocalIP = parseAddressV6(event.LocalAddrV6)
+                       } else {
+                               continue
+                       }
+
+                       // append to the closed connection
+                       c.closedConnections = append(c.closedConnections, 
c.combineClosedConnection(cc, event))
                }
-               c.secondCloseEventCache = make([]*SocketCloseEventWrapper, 0)
        }
+}
 
-       for len(c.flushClosedEvents) > 0 {
-               wrapper := <-c.flushClosedEvents
-               if c.socketClosedEvent0(wrapper.SocketCloseEvent) {
-                       continue
-               }
-               wrapper.NotExistsCount++
-               // try to add the flush queue to re-process when next flush all 
connections
-               c.secondCloseEventCache = append(c.secondCloseEventCache, 
wrapper)
+func (c *Context) newConnectionContext(conID, randomID uint64, pid, fd uint32, 
processes []api.ProcessInterface, conClosed bool) *ConnectionContext {
+       return &ConnectionContext{
+               // metadata
+               ConnectionID:     conID,
+               RandomID:         randomID,
+               LocalPid:         pid,
+               SocketFD:         fd,
+               LocalProcesses:   processes,
+               ConnectionClosed: conClosed,
+
+               // metrics
+               WriteCounter:          NewSocketDataCounterWithHistory(),
+               ReadCounter:           NewSocketDataCounterWithHistory(),
+               WriteRTTCounter:       NewSocketDataCounterWithHistory(),
+               WriteRTTHistogram:     NewSocketDataHistogramWithHistory(),
+               WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(),
+               ReadExeTimeHistogram:  NewSocketDataHistogramWithHistory(),
        }
 }
 
 func (c *Context) handleSocketCloseEvent(data interface{}) {
        event := data.(*SocketCloseEvent)
 
+       if log.Enable(logrus.DebugLevel) {
+               marshal, _ := json.Marshal(event)
+               log.Debugf("found close event: %s", string(marshal))
+       }
+
        // try to handle the socket close event
        if !c.socketClosedEvent0(event) {
                // is not in active connection, maybe it's not have been added 
to activate first
                // just add to the close queue, wait for the flush connection 
with interval
-               c.flushClosedEvents <- 
&SocketCloseEventWrapper{SocketCloseEvent: event, NotExistsCount: 1}
+               c.flushClosedEvents <- event
                return
        }
-       if log.Enable(logrus.DebugLevel) {
-               marshal, _ := json.Marshal(event)
-               log.Debugf("found close event: %s", string(marshal))
-       }
 }
 
 // SocketExceptionOperationEvent Socket have been retransmitted/drop the 
package event
@@ -554,28 +583,28 @@ func (c *Context) 
handleSocketExceptionOperationEvent(data interface{}) {
 }
 
 func (c *Context) socketClosedEvent0(event *SocketCloseEvent) bool {
-       c.connectionLock.Lock()
-       defer c.connectionLock.Unlock()
-
-       conKey := c.generateConnectionKey(event.ConID, event.RandomID)
-       activeCon := c.activeConnections[conKey]
+       activeCon := c.foundAndDeleteConnection(event)
        if activeCon == nil {
                return false
        }
 
-       // delete the active connection
-       delete(c.activeConnections, conKey)
-
        // combine the connection data
        c.closedConnections = append(c.closedConnections, 
c.combineClosedConnection(activeCon, event))
        return true
 }
 
+func (c *Context) foundAndDeleteConnection(event *SocketCloseEvent) 
*ConnectionContext {
+       conKey := c.generateConnectionKey(event.ConID, event.RandomID)
+       val, exists := c.activeConnections.Pop(conKey)
+       if !exists {
+               return nil
+       }
+       return val.(*ConnectionContext)
+}
+
 func (c *Context) deleteConnectionOnly(ccs []string) {
-       c.connectionLock.Lock()
-       defer c.connectionLock.Unlock()
        for _, cc := range ccs {
-               delete(c.activeConnections, cc)
+               c.activeConnections.Remove(cc)
        }
 }
 
@@ -620,6 +649,7 @@ func (c *Context) AddProcesses(processes 
[]api.ProcessInterface) error {
                if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid), 
uint32(1), ebpf.UpdateAny); err1 != nil {
                        err = multierror.Append(err, err1)
                }
+               log.Debugf("add monitor process, pid: %d", pid)
        }
        return err
 }
@@ -650,6 +680,7 @@ func (c *Context) DeleteProcesses(processes 
[]api.ProcessInterface) (emptyProces
                        if err1 := 
c.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
                                err = multierror.Append(err, err1)
                        }
+                       log.Debugf("delete monitor process: %d", pid)
                        delete(c.processes, pid)
                        continue
                }
diff --git a/pkg/profiling/task/network/delegate.go 
b/pkg/profiling/task/network/delegate.go
index 1b1510c..ef0bf8f 100644
--- a/pkg/profiling/task/network/delegate.go
+++ b/pkg/profiling/task/network/delegate.go
@@ -62,11 +62,7 @@ func (r *DelegateRunner) Init(task *base.ProfilingTask, 
processes []api.ProcessI
 
 func (r *DelegateRunner) Run(ctx context.Context, notify 
base.ProfilingRunningSuccessNotify) error {
        r.ctx, r.cancel = context.WithCancel(ctx)
-       if err := realRunner.Start(ctx); err != nil {
-               return err
-       }
-       // add processes
-       if err := realRunner.AddProcess(r.processes); err != nil {
+       if err := realRunner.Start(ctx, r.processes); err != nil {
                return err
        }
        notify()
diff --git a/pkg/profiling/task/network/linker.go 
b/pkg/profiling/task/network/linker.go
index 278bc09..859930a 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/linker.go
@@ -26,6 +26,8 @@ import (
        "os"
        "sync"
 
+       "github.com/apache/skywalking-rover/pkg/tools"
+
        "github.com/cilium/ebpf"
        "github.com/cilium/ebpf/link"
        "github.com/cilium/ebpf/perf"
@@ -33,9 +35,42 @@ import (
        "github.com/hashicorp/go-multierror"
 )
 
+const defaultSymbolPrefix = "sys_"
+
 type LinkFunc func(symbol string, prog *ebpf.Program) (link.Link, error)
 type RingBufferReader func(data interface{})
 
+var syscallPrefix string
+
+func init() {
+       stat, err := tools.KernelFileProfilingStat()
+       if err != nil {
+               syscallPrefix = defaultSymbolPrefix
+               return
+       }
+       var possiblePrefixes = []string{
+               defaultSymbolPrefix,
+               "__x64_sys_",
+               "__x32_compat_sys_",
+               "__ia32_compat_sys_",
+               "__arm64_sys_",
+               "__s390x_sys_",
+               "__s390_sys_",
+       }
+
+       found := false
+       for _, p := range possiblePrefixes {
+               if stat.FindSymbolAddress(fmt.Sprintf("%sbpf", p)) != 0 {
+                       found = true
+                       syscallPrefix = p
+                       break
+               }
+       }
+       if !found {
+               syscallPrefix = "sys_"
+       }
+}
+
 type Linker struct {
        closers   []io.Closer
        errors    error
@@ -64,11 +99,12 @@ func (m *Linker) AddSysCall(call string, enter, exit 
*ebpf.Program) {
 }
 
 func (m *Linker) AddSysCallWithKProbe(call string, linkK LinkFunc, p 
*ebpf.Program) {
-       kprobe, err := linkK("sys_"+call, p)
+       kprobe, err := linkK(syscallPrefix+call, p)
 
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("could not 
attach syscall with %s: %v", "sys_"+call, err))
        } else {
+               log.Debugf("attach to the syscall: %s", syscallPrefix+call)
                m.closers = append(m.closers, kprobe)
        }
 }
@@ -108,7 +144,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, reader 
RingBufferReader, dataSup
 
                        data := dataSupplier()
                        if err := 
binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err 
!= nil {
-                               log.Warnf("parsing data from %s ringbuffer 
error: %v", emap.String(), err)
+                               log.Warnf("parsing data from %s, raw size: %d, 
ringbuffer error: %v", emap.String(), len(record.RawSample), err)
                                continue
                        }
 
diff --git a/pkg/profiling/task/network/metrics.go 
b/pkg/profiling/task/network/metrics.go
index 91fca81..191a0b8 100644
--- a/pkg/profiling/task/network/metrics.go
+++ b/pkg/profiling/task/network/metrics.go
@@ -181,6 +181,8 @@ func subtractionValue(v1, v2 uint64) uint64 {
 }
 
 type ProcessTraffic struct {
+       analyzer *TrafficAnalyzer
+
        // local process information
        LocalPid       uint32
        LocalProcesses []api.ProcessInterface
@@ -350,7 +352,7 @@ func (r *ProcessTraffic) appendRemoteAddrssInfo(labels 
[]*v3.Label, prefix strin
                }
        }
 
-       if tools.IsLocalHostAddress(r.RemoteIP) {
+       if tools.IsLocalHostAddress(r.RemoteIP) || 
r.analyzer.IsLocalAddressInCache(r.RemoteIP) {
                return r.appendMeterValue(labels, prefix+"_local", "true")
        }
 
diff --git a/pkg/profiling/task/network/runner.go 
b/pkg/profiling/task/network/runner.go
index a294095..8821903 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -48,12 +48,12 @@ var log = logger.GetLogger("profiling", "task", "network", 
"topology")
 
 type Runner struct {
        initOnce       sync.Once
-       startOnce      sync.Once
+       startLock      sync.Mutex
+       stopOnce       sync.Once
        meterClient    v3.MeterReportServiceClient
        reportInterval time.Duration
        meterPrefix    string
 
-       processes  map[int32][]api.ProcessInterface
        bpf        *bpfObjects
        linker     *Linker
        bpfContext *Context
@@ -64,7 +64,6 @@ type Runner struct {
 
 func NewGlobalRunnerContext() *Runner {
        return &Runner{
-               processes:  make(map[int32][]api.ProcessInterface),
                bpfContext: NewContext(),
                linker:     &Linker{},
        }
@@ -78,24 +77,18 @@ func (r *Runner) init(config *base.TaskConfig, moduleMgr 
*module.Manager) error
        return err
 }
 
-func (r *Runner) AddProcess(processes []api.ProcessInterface) error {
-       // adding processes
-       return r.bpfContext.AddProcesses(processes)
-}
-
 func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool, 
error) {
        return r.bpfContext.DeleteProcesses(processes)
 }
 
-func (r *Runner) Start(ctx context.Context) error {
-       var err error
-       r.startOnce.Do(func() {
-               err = r.start0(ctx)
-       })
-       return err
-}
+func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) 
error {
+       r.startLock.Lock()
+       defer r.startLock.Unlock()
+       // if already start, then just adding the processes
+       if r.bpf != nil {
+               return r.bpfContext.AddProcesses(processes)
+       }
 
-func (r *Runner) start0(ctx context.Context) error {
        r.ctx, r.cancel = context.WithCancel(ctx)
        // load bpf program
        objs := bpfObjects{}
@@ -105,6 +98,14 @@ func (r *Runner) start0(ctx context.Context) error {
        r.bpf = &objs
        r.bpfContext.Init(&objs)
 
+       if err := r.bpfContext.AddProcesses(processes); err != nil {
+               return err
+       }
+
+       // register all handlers
+       r.bpfContext.RegisterAllHandlers(r.linker)
+       r.bpfContext.StartSocketAddressParser(r.ctx)
+
        // sock opts
        r.linker.AddSysCall("connect", objs.SysConnect, objs.SysConnectRet)
        r.linker.AddSysCall("accept", objs.SysAccept, objs.SysAcceptRet)
@@ -138,9 +139,6 @@ func (r *Runner) start0(ctx context.Context) error {
        // close socket
        r.linker.AddSysCall("close", objs.SysClose, objs.SysCloseRet)
 
-       // register all handlers
-       r.bpfContext.RegisterAllHandlers(r.linker)
-
        if err := r.linker.HasError(); err != nil {
                _ = r.linker.Close()
                return err
@@ -180,13 +178,14 @@ func (r *Runner) flushMetrics() error {
 
        if log.Enable(logrus.DebugLevel) {
                for _, con := range connections {
-                       log.Debugf("found connection: %s relation: %s:%d(%d) -> 
%s:%d, read: %d bytes/%d, write: %d bytes/%d", con.Role.String(),
+                       log.Debugf("found connection: %d, %s relation: 
%s:%d(%d) -> %s:%d, read: %d bytes/%d, write: %d bytes/%d",
+                               con.ConnectionID, con.Role.String(),
                                con.LocalIP, con.LocalPort, con.LocalPid, 
con.RemoteIP, con.RemotePort, con.WriteCounter.Cur.Bytes, 
con.WriteCounter.Cur.Count,
                                con.ReadCounter.Cur.Bytes, 
con.ReadCounter.Cur.Count)
                }
        }
        // combine all connection
-       analyzer := NewTrafficAnalyzer()
+       analyzer := NewTrafficAnalyzer(r.bpfContext.processes)
        traffics := analyzer.CombineConnectionToTraffics(connections)
        if len(traffics) == 0 {
                return nil
@@ -246,10 +245,14 @@ func (r *Runner) logTheMetricsConnections(traffices 
[]*ProcessTraffic) {
 }
 
 func (r *Runner) Stop() error {
-       r.cancel()
+       if r.cancel != nil {
+               r.cancel()
+       }
        var result error
-       result = r.closeWhenExists(result, r.linker)
-       result = r.closeWhenExists(result, r.bpf)
+       r.stopOnce.Do(func() {
+               result = r.closeWhenExists(result, r.linker)
+               result = r.closeWhenExists(result, r.bpf)
+       })
        return result
 }
 
diff --git a/pkg/profiling/task/network/tcpresolver.go 
b/pkg/profiling/task/network/tcpresolver.go
index 2184705..92d2326 100644
--- a/pkg/profiling/task/network/tcpresolver.go
+++ b/pkg/profiling/task/network/tcpresolver.go
@@ -33,7 +33,7 @@ import (
 )
 
 var (
-       notBlankRegex = regexp.MustCompile(`\\s+`)
+       notBlankRegex = regexp.MustCompile(`\s+`)
        ipv4StrLen    = 8
        ipv6StrLen    = 32
 )
@@ -59,8 +59,6 @@ func ParseSocket(pid, sockfd uint32) (*SocketPair, error) {
        var s *SocketPair
        s, err = foundAddressByFile(s, err, 
fmt.Sprintf(host.GetFileInHost("/proc/%d/net/tcp"), pid), inode)
        s, err = foundAddressByFile(s, err, 
fmt.Sprintf(host.GetFileInHost("/proc/%d/net/tcp6"), pid), inode)
-       s, err = foundAddressByFile(s, err, 
host.GetFileInHost("/proc/net/tcp"), inode)
-       s, err = foundAddressByFile(s, err, 
host.GetFileInHost("/proc/net/tcp6"), inode)
        return s, err
 }
 

Reply via email to