This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 87f2337  Support OpenSSL library and add protocol and IsSSL labels in 
metrics (#43)
87f2337 is described below

commit 87f23377feb901ba6b7cd8b476c448aa4e5d1ed0
Author: mrproliu <[email protected]>
AuthorDate: Tue Aug 2 20:20:30 2022 +0800

    Support OpenSSL library and add protocol and IsSSL labels in metrics (#43)
---
 CHANGES.md                             |   1 +
 bpf/profiling/network/args.h           |   4 +-
 bpf/profiling/network/netmonitor.c     |  46 ++++++----
 bpf/profiling/network/openssl.c        | 116 ++++++++++++++++++++++++
 bpf/profiling/network/openssl.h        |  43 +++++++++
 bpf/profiling/network/sock_stats.h     |   6 ++
 pkg/profiling/task/network/analyzer.go |   6 ++
 pkg/profiling/task/network/context.go  |  40 +++++++--
 pkg/profiling/task/network/enums.go    |  55 +++++++++++-
 pkg/profiling/task/network/linker.go   |  60 +++++++++++++
 pkg/profiling/task/network/metrics.go  |   8 ++
 pkg/profiling/task/network/runner.go   |   8 +-
 pkg/profiling/task/network/ssl.go      | 155 +++++++++++++++++++++++++++++++++
 pkg/tools/process.go                   |  16 ++++
 pkg/tools/profiling/go_library.go      |   9 +-
 15 files changed, 537 insertions(+), 36 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1cd7a8a..8c74a6b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
 #### Features
 * Support `NETWORK` Profiling.
 * Let the logger as a configurable module.
+* Support analyze the data of OpenSSL library in `NETWORK` Profiling.
 
 #### Bug Fixes
 * Fixed reading process paths incorrect when running as a container.
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index ce65720..b9bcc59 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -41,7 +41,9 @@
 #define SOCKET_OPTS_TYPE_RECVFROM   14
 #define SOCKET_OPTS_TYPE_RECVMSG    15
 #define SOCKET_OPTS_TYPE_RECVMMSG   16
-#define SOCKET_OPTS_TYPE_RESENT   17
+#define SOCKET_OPTS_TYPE_RESENT     17
+#define SOCKET_OPTS_TYPE_SSL_WRITE  18
+#define SOCKET_OPTS_TYPE_SSL_READ   19
 
 // tracepoint enter
 struct trace_event_raw_sys_enter {
diff --git a/bpf/profiling/network/netmonitor.c 
b/bpf/profiling/network/netmonitor.c
index 7296606..b7262fc 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -192,8 +192,9 @@ static __inline void notify_close_connection(struct 
pt_regs* ctx, __u64 conid, s
     close_event.random_id = con->random_id;
     close_event.exe_time = exe_time;
     close_event.pid = con->pid;
-    close_event.sockfd = con->sockfd;
     close_event.role = con->role;
+    close_event.protocol = con->protocol;
+    close_event.ssl = con->ssl;
 
     close_event.socket_family = con->socket_family;
     close_event.local_addr_v4 = con->local_addr_v4;
@@ -236,6 +237,14 @@ static __inline struct active_connection_t* 
get_or_create_active_conn(struct pt_
     return bpf_map_lookup_elem(&active_connection_map, &conid);
 }
 
+static __inline void set_conn_as_ssl(struct pt_regs* ctx, __u32 tgid, __u32 
fd, __u32 func_name) {
+    struct active_connection_t* conn = get_or_create_active_conn(ctx, tgid, 
fd, func_name);
+    if (conn == NULL) {
+        return;
+    }
+    conn->ssl = true;
+};
+
 static __inline void submit_connection_when_not_exists(struct pt_regs *ctx, 
__u64 id, const struct connect_args_t* connect_args, __u32 func_name) {
     __u32 tgid = (__u32)(id >> 32);
     __u32 fd = connect_args->fd;
@@ -274,7 +283,7 @@ static __always_inline void resent_connect_event(struct 
pt_regs *ctx, __u32 tgid
 }
 
 static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, 
struct sock_data_args_t *args, ssize_t bytes_count,
-                                        __u32 data_direction, const bool vecs, 
__u32 func_name) {
+                                        __u32 data_direction, const bool vecs, 
__u32 func_name, bool ssl) {
     __u64 curr_nacs = bpf_ktime_get_ns();
     __u32 tgid = (__u32)(id >> 32);
 
@@ -309,8 +318,9 @@ static __always_inline void process_write_data(struct 
pt_regs *ctx, __u64 id, st
         resent_connect_event(ctx, tgid, args->fd, conid, conn);
     }
 
-    // unknown connection role, then try to use procotol analyzer to analyze 
request or response
-    if (conn->role == CONNECTION_ROLE_TYPE_UNKNOWN) {
+    // if the protocol or role is unknown in the connection and the current 
data content is plaintext
+    // then try to use protocol analyzer to analyze request or response and 
protocol type
+    if ((conn->role == CONNECTION_ROLE_TYPE_UNKNOWN || conn->protocol == 0) && 
conn->ssl == ssl) {
         struct socket_buffer_reader_t *buf_reader = NULL;
         if (args->buf != NULL) {
             buf_reader = read_socket_data(args->buf, bytes_count);
@@ -506,7 +516,7 @@ int sys_sendto_ret(struct pt_regs *ctx) {
 
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SENDTO);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SENDTO, false);
     }
 
     bpf_map_delete_elem(&socket_data_args, &id);
@@ -551,7 +561,7 @@ int sys_write_ret(struct pt_regs *ctx) {
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args && data_args->is_sock_event) {
         ssize_t bytes_count = PT_REGS_RC(ctx);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_WRITE);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_WRITE, false);
     }
 
     bpf_map_delete_elem(&socket_data_args, &id);
@@ -576,7 +586,7 @@ int sys_send_ret(struct pt_regs* ctx) {
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
         ssize_t bytes_count = PT_REGS_RC(ctx);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SEND);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SEND, false);
     }
 
     bpf_map_delete_elem(&socket_data_args, &id);
@@ -605,7 +615,7 @@ int sys_writev_ret(struct pt_regs* ctx) {
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args && data_args->is_sock_event) {
         ssize_t bytes_count = PT_REGS_RC(ctx);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_WRITEV);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_WRITEV, false);
     }
 
     bpf_map_delete_elem(&socket_data_args, &id);
@@ -655,7 +665,7 @@ int sys_sendmsg_ret(struct pt_regs* ctx) {
     // socket data
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMSG);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMSG, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -711,7 +721,7 @@ int sys_sendmmsg_ret(struct pt_regs* ctx) {
     if (data_args) {
         __u32 bytes_count;
         BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -796,7 +806,7 @@ int sys_read_ret(struct pt_regs* ctx) {
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args && data_args->is_sock_event) {
         ssize_t bytes_count = PT_REGS_RC(ctx);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_READ);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_READ, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -824,7 +834,7 @@ int sys_readv_ret(struct pt_regs* ctx) {
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args && data_args->is_sock_event) {
         ssize_t bytes_count = PT_REGS_RC(ctx);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_READV);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_READV, false);
     }
 
     bpf_map_delete_elem(&socket_data_args, &id);
@@ -849,7 +859,7 @@ int sys_recv_ret(struct pt_regs* ctx) {
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
         ssize_t bytes_count = PT_REGS_RC(ctx);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECV);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECV, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -895,7 +905,7 @@ int sys_recvfrom_ret(struct pt_regs* ctx) {
     // socket data
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECVFROM);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECVFROM, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -944,7 +954,7 @@ int sys_recvmsg_ret(struct pt_regs* ctx) {
     // socket data
     struct sock_data_args_t *data_args = 
bpf_map_lookup_elem(&socket_data_args, &id);
     if (data_args) {
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMSG);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMSG, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -1000,7 +1010,7 @@ int sys_recvmmsg_ret(struct pt_regs* ctx) {
     if (data_args) {
         __u32 bytes_count;
         BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
-        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG);
+        process_write_data(ctx, id, data_args, bytes_count, 
SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG, false);
     }
     bpf_map_delete_elem(&socket_data_args, &id);
     return 0;
@@ -1109,4 +1119,6 @@ int tcp_drop(struct pt_regs *ctx) {
     struct sock *s = (void *)PT_REGS_PARM1(ctx);
     send_socket_exception_operation_event(ctx, 
SOCKET_EXCEPTION_OPERATION_TYPE_DROP, s);
     return 0;
-}
\ No newline at end of file
+}
+
+#include "openssl.c"
\ No newline at end of file
diff --git a/bpf/profiling/network/openssl.c b/bpf/profiling/network/openssl.c
new file mode 100644
index 0000000..3bd81ed
--- /dev/null
+++ b/bpf/profiling/network/openssl.c
@@ -0,0 +1,116 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "openssl.h"
+
+static __inline void process_openssl_data(struct pt_regs* ctx, __u64 id, __u32 
data_direction, struct sock_data_args_t* args, __u32 func_name) {
+    int bytes_count = PT_REGS_RC(ctx);
+    process_write_data(ctx, id, args, bytes_count, data_direction, false, 
func_name, true);
+}
+
+static int get_fd_symaddr(__u32 tgid, bool read, void* ssl) {
+    struct openssl_fd_symaddr* addr = get_openssl_fd_symaddr(tgid);
+    if (addr == NULL) {
+        return -1;
+    }
+    __u32 bio_offset = read ? addr->bio_read_offset : addr->bio_write_offset;
+    __u32 fd_offset = addr->fd_offset;
+
+    void *bio = NULL;
+    bpf_probe_read(&bio, sizeof(bio), ssl + bio_offset);
+    __u32 fd;
+    bpf_probe_read(&fd, sizeof(fd), bio + fd_offset);
+
+    return fd;
+}
+
+static int get_fd(uint32_t tgid, bool read, void* ssl) {
+    int fd = -1;
+
+    fd = get_fd_symaddr(tgid, read, ssl);
+    if (fd > 2) {
+        return fd;
+    }
+
+    return -1;
+}
+
+SEC("uprobe/ssl_write")
+int openssl_write(struct pt_regs* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    __u32 tgid = id >> 32;
+
+    void* ssl = (void*)PT_REGS_PARM1(ctx);
+    __u32 fd = get_fd(tgid, false, ssl);
+    bpf_printk("ssl_write fd: %d\n", fd);
+    if (fd < 0) {
+        return 0;
+    }
+
+    char* buf = (char*)PT_REGS_PARM2(ctx);
+    struct sock_data_args_t data_args = {};
+    data_args.fd = fd;
+    data_args.buf = buf;
+    bpf_map_update_elem(&openssl_sock_data_args, &id, &data_args, 0);
+
+    set_conn_as_ssl(ctx, tgid, fd, SOCKET_OPTS_TYPE_SSL_WRITE);
+    return 0;
+}
+
+SEC("uretprobe/ssl_write")
+int openssl_write_ret(struct pt_regs* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    struct sock_data_args_t *args = 
bpf_map_lookup_elem(&openssl_sock_data_args, &id);
+    if (args) {
+        process_openssl_data(ctx, id, SOCK_DATA_DIRECTION_EGRESS, args, 
SOCKET_OPTS_TYPE_SSL_WRITE);
+    }
+    bpf_map_delete_elem(&openssl_sock_data_args, &id);
+    return 0;
+}
+
+SEC("uprobe/ssl_read")
+int openssl_read(struct pt_regs* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    __u32 tgid = id >> 32;
+
+    void* ssl = (void*)PT_REGS_PARM1(ctx);
+    __u32 fd = get_fd(tgid, true, ssl);
+    bpf_printk("ssl_read fd: %d\n", fd);
+    if (fd < 0) {
+        return 0;
+    }
+
+    char* buf = (char*)PT_REGS_PARM2(ctx);
+    struct sock_data_args_t data_args = {};
+    data_args.fd = fd;
+    data_args.buf = buf;
+    bpf_map_update_elem(&openssl_sock_data_args, &id, &data_args, 0);
+
+    set_conn_as_ssl(ctx, tgid, fd, SOCKET_OPTS_TYPE_SSL_WRITE);
+    return 0;
+}
+
+SEC("uretprobe/ssl_read")
+int openssl_read_ret(struct pt_regs* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    struct sock_data_args_t *args = 
bpf_map_lookup_elem(&openssl_sock_data_args, &id);
+    if (args) {
+        process_openssl_data(ctx, id, SOCK_DATA_DIRECTION_INGRESS, args, 
SOCKET_OPTS_TYPE_SSL_READ);
+    }
+    bpf_map_delete_elem(&openssl_sock_data_args, &id);
+    return 0;
+}
\ No newline at end of file
diff --git a/bpf/profiling/network/openssl.h b/bpf/profiling/network/openssl.h
new file mode 100644
index 0000000..11536be
--- /dev/null
+++ b/bpf/profiling/network/openssl.h
@@ -0,0 +1,43 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+struct openssl_fd_symaddr {
+    // read the BIO offset from ssl
+    __u32 bio_read_offset;
+    __u32 bio_write_offset;
+    // read the fd offset from BIO
+    __u32 fd_offset;
+};
+
+struct {
+       __uint(type, BPF_MAP_TYPE_HASH);
+       __uint(max_entries, 10000);
+       __type(key, __u32);
+       __type(value, struct openssl_fd_symaddr);
+} openssl_fd_symaddr_finder SEC(".maps");
+static __inline struct openssl_fd_symaddr* get_openssl_fd_symaddr(__u32 tgid) {
+    struct openssl_fd_symaddr *addr = 
bpf_map_lookup_elem(&openssl_fd_symaddr_finder, &tgid);
+    return addr;
+}
+
+// openssl read or write
+struct {
+       __uint(type, BPF_MAP_TYPE_HASH);
+       __uint(max_entries, 10000);
+       __type(key, __u64);
+       __type(value, struct sock_data_args_t);
+} openssl_sock_data_args SEC(".maps");
\ No newline at end of file
diff --git a/bpf/profiling/network/sock_stats.h 
b/bpf/profiling/network/sock_stats.h
index a597116..018bc69 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -59,6 +59,8 @@ struct active_connection_t {
     __u64 prev_count;
     char prev_buf[4];
     __u32 prepend_length_header;
+    // current connection is ssl
+    __u32 ssl;
 
     // connect event already send
     __u32 connect_event_send;
@@ -136,6 +138,10 @@ struct socket_close_event_t {
     __u32 sockfd;
     // the type of role in current connection
     __u32 role;
+    // the protocol type of the connection
+    __u32 protocol;
+    // the connection is ssl
+    __u32 ssl;
     __u32 fix;
 
     // socket type
diff --git a/pkg/profiling/task/network/analyzer.go 
b/pkg/profiling/task/network/analyzer.go
index e92a574..581da98 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyzer.go
@@ -192,6 +192,12 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic 
*ProcessTraffic, con
        if traffic.ConnectionRole == ConnectionRoleUnknown && con.Role != 
ConnectionRoleUnknown {
                traffic.ConnectionRole = con.Role
        }
+       if traffic.Protocol == ConnectionProtocolUnknown && con.Protocol != 
ConnectionProtocolUnknown {
+               traffic.Protocol = con.Protocol
+       }
+       if !traffic.IsSSL && con.IsSSL {
+               traffic.IsSSL = true
+       }
 
        if remotePid != 0 {
                traffic.RemotePid = remotePid
diff --git a/pkg/profiling/task/network/context.go 
b/pkg/profiling/task/network/context.go
index 451496f..cba0ce2 100644
--- a/pkg/profiling/task/network/context.go
+++ b/pkg/profiling/task/network/context.go
@@ -42,7 +42,8 @@ import (
 type Context struct {
        processes map[int32][]api.ProcessInterface
 
-       bpf *bpfObjects // current bpf programs
+       bpf    *bpfObjects // current bpf programs
+       linker *Linker
 
        // standard syscall connections
        activeConnections cmap.ConcurrentMap      // current activeConnections 
connections
@@ -79,6 +80,8 @@ type ConnectionContext struct {
        SocketFD         uint32
        LocalProcesses   []api.ProcessInterface
        ConnectionClosed bool
+       Protocol         ConnectionProtocol
+       IsSSL            bool
 
        // socket metadata
        Role       ConnectionRole
@@ -124,21 +127,22 @@ func NewContext() *Context {
        }
 }
 
-func (c *Context) Init(bpf *bpfObjects) {
+func (c *Context) Init(bpf *bpfObjects, linker *Linker) {
        c.bpf = bpf
+       c.linker = linker
 }
 
-func (c *Context) RegisterAllHandlers(linker *Linker) {
+func (c *Context) RegisterAllHandlers() {
        // socket connect
-       linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, 
c.handleSocketConnectEvent, func() interface{} {
+       c.linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, 
c.handleSocketConnectEvent, func() interface{} {
                return &SocketConnectEvent{}
        })
        // socket close
-       linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, 
c.handleSocketCloseEvent, func() interface{} {
+       c.linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, 
c.handleSocketCloseEvent, func() interface{} {
                return &SocketCloseEvent{}
        })
        // socket retransmit
-       linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, 
c.handleSocketExceptionOperationEvent, func() interface{} {
+       c.linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, 
c.handleSocketExceptionOperationEvent, func() interface{} {
                return &SocketExceptionOperationEvent{}
        })
 }
@@ -277,10 +281,11 @@ type ActiveConnectionInBPF struct {
        WriteRTTExeTime uint64
 
        // Protocol analyze context
-       Protocol              uint32
+       Protocol              ConnectionProtocol
        ProtocolPrevCount     uint64
        ProtocolPrevBuf       [4]byte
        ProtocolPrependHeader uint32
+       IsSSL                 uint32
 
        // the connect event is already sent
        ConnectEventIsSent uint32
@@ -323,6 +328,12 @@ func (c *Context) fillConnectionMetrics(ccs 
[]*ConnectionContext) {
                        if cc.Role == ConnectionRoleUnknown && 
activeConnection.Role != ConnectionRoleUnknown {
                                cc.Role = activeConnection.Role
                        }
+                       if cc.Protocol == ConnectionProtocolUnknown && 
activeConnection.Protocol != ConnectionProtocolUnknown {
+                               cc.Protocol = activeConnection.Protocol
+                       }
+                       if !cc.IsSSL && activeConnection.IsSSL == 1 {
+                               cc.IsSSL = true
+                       }
 
                        // update the role
                        
cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, 
activeConnection.WriteCount, activeConnection.WriteExeTime)
@@ -453,6 +464,8 @@ type SocketCloseEvent struct {
        Pid      uint32
        SocketFD uint32
        Role     ConnectionRole
+       Protocol ConnectionProtocol
+       IsSSL    uint32
        Fix      uint32
 
        SocketFamily   uint32
@@ -614,6 +627,12 @@ func (c *Context) combineClosedConnection(active 
*ConnectionContext, closed *Soc
        if active.Role == ConnectionRoleUnknown && closed.Role != 
ConnectionRoleUnknown {
                active.Role = closed.Role
        }
+       if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != 
ConnectionProtocolUnknown {
+               active.Protocol = closed.Protocol
+       }
+       if !active.IsSSL && closed.IsSSL == 1 {
+               active.IsSSL = true
+       }
 
        active.WriteCounter.UpdateToCurrent(closed.WriteBytes, 
closed.WriteCount, closed.WriteExeTime)
        active.ReadCounter.UpdateToCurrent(closed.ReadBytes, closed.ReadCount, 
closed.ReadExeTime)
@@ -646,9 +665,16 @@ func (c *Context) AddProcesses(processes 
[]api.ProcessInterface) error {
 
                c.processes[pid] = append(c.processes[pid], p)
 
+               // add to the process let it could be monitored
                if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid), 
uint32(1), ebpf.UpdateAny); err1 != nil {
                        err = multierror.Append(err, err1)
                }
+
+               // add process ssl config
+               if err1 := addSSLProcess(int(pid), c.bpf, c.linker); err1 != 
nil {
+                       err = multierror.Append(err, err1)
+               }
+
                log.Debugf("add monitor process, pid: %d", pid)
        }
        return err
diff --git a/pkg/profiling/task/network/enums.go 
b/pkg/profiling/task/network/enums.go
index 6f20e0a..2f5981f 100644
--- a/pkg/profiling/task/network/enums.go
+++ b/pkg/profiling/task/network/enums.go
@@ -17,6 +17,11 @@
 
 package network
 
+const (
+       unknown = "unknown"
+       http    = "http"
+)
+
 // ConnectionRole represents the role of the current process is the connection
 // whether it's a server or a client, if it's not trigger the 
connection/accept request, then it's unknown
 type ConnectionRole uint32
@@ -34,7 +39,7 @@ func (r ConnectionRole) String() string {
        case ConnectionRoleServer:
                return "server"
        default:
-               return "unknown"
+               return unknown
        }
 }
 
@@ -71,3 +76,51 @@ const (
        SocketExceptionOperationRetransmit SocketExceptionOperationType = 1
        SocketExceptionOperationDrop       SocketExceptionOperationType = 2
 )
+
+type ConnectionProtocol uint32
+
+const (
+       ConnectionProtocolUnknown ConnectionProtocol = 0
+       ConnectionProtocolHTTP    ConnectionProtocol = 1
+       ConnectionProtocolHTTP2   ConnectionProtocol = 2
+       ConnectionProtocolMySQL   ConnectionProtocol = 3
+       ConnectionProtocolCQL     ConnectionProtocol = 4
+       ConnectionProtocolPGSQL   ConnectionProtocol = 5
+       ConnectionProtocolDNS     ConnectionProtocol = 6
+       ConnectionProtocolRedis   ConnectionProtocol = 7
+       ConnectionProtocolNATS    ConnectionProtocol = 8
+       ConnectionProtocolMongo   ConnectionProtocol = 9
+       ConnectionProtocolKafka   ConnectionProtocol = 10
+       ConnectionProtocolMux     ConnectionProtocol = 11
+)
+
+func (c ConnectionProtocol) String() string {
+       switch c {
+       case ConnectionProtocolUnknown:
+               return unknown
+       case ConnectionProtocolHTTP:
+               return http
+       case ConnectionProtocolHTTP2:
+               return http
+       case ConnectionProtocolMySQL:
+               return "mysql"
+       case ConnectionProtocolCQL:
+               return "cql"
+       case ConnectionProtocolPGSQL:
+               return "pgsql"
+       case ConnectionProtocolDNS:
+               return "dns"
+       case ConnectionProtocolRedis:
+               return "redis"
+       case ConnectionProtocolNATS:
+               return "nats"
+       case ConnectionProtocolMongo:
+               return "mongo"
+       case ConnectionProtocolKafka:
+               return "kafka"
+       case ConnectionProtocolMux:
+               return "mutex"
+       default:
+               return unknown
+       }
+}
diff --git a/pkg/profiling/task/network/linker.go 
b/pkg/profiling/task/network/linker.go
index 859930a..46015bb 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/linker.go
@@ -77,18 +77,28 @@ type Linker struct {
        closeOnce sync.Once
 }
 
+type UProbeExeFile struct {
+       addr     string
+       found    bool
+       liker    *Linker
+       realFile *link.Executable
+}
+
 func (m *Linker) AddLink(linkF LinkFunc, p *ebpf.Program, trySymbolNames 
...string) {
        var lk link.Link
        var err error
+       var realSym string
        for _, n := range trySymbolNames {
                lk, err = linkF(n, p)
                if err == nil {
+                       realSym = n
                        break
                }
        }
        if err != nil {
                m.errors = multierror.Append(m.errors, fmt.Errorf("open %s 
error: %v", trySymbolNames, err))
        } else {
+               log.Debugf("attach to the kprobe: %s", realSym)
                m.closers = append(m.closers, lk)
        }
 }
@@ -153,6 +163,56 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, reader 
RingBufferReader, dataSup
        }()
 }
 
+func (m *Linker) OpenUProbeExeFile(path string) *UProbeExeFile {
+       executable, err := link.OpenExecutable(path)
+       if err != nil {
+               m.errors = multierror.Append(m.errors, fmt.Errorf("cannot found 
the execute file: %s, error: %v", path, err))
+               return &UProbeExeFile{
+                       found: false,
+               }
+       }
+
+       return &UProbeExeFile{
+               found:    true,
+               addr:     path,
+               liker:    m,
+               realFile: executable,
+       }
+}
+
+func (m *UProbeExeFile) AddLink(symbol string, enter, exit *ebpf.Program, pid 
int) {
+       m.AddLinkWithType(symbol, true, enter, pid)
+       m.AddLinkWithType(symbol, false, exit, pid)
+}
+
+func (m *UProbeExeFile) AddLinkWithType(symbol string, enter bool, p 
*ebpf.Program, pid int) {
+       if !m.found {
+               return
+       }
+       var fun func(symbol string, prog *ebpf.Program, opts 
*link.UprobeOptions) (link.Link, error)
+       if enter {
+               fun = m.realFile.Uprobe
+       } else {
+               fun = m.realFile.Uretprobe
+       }
+
+       var t string
+       if enter {
+               t = "enter"
+       } else {
+               t = "exit"
+       }
+
+       lk, err := fun(symbol, p, &link.UprobeOptions{PID: pid})
+       if err != nil {
+               m.liker.errors = multierror.Append(m.liker.errors, 
fmt.Errorf("file: %s, symbol: %s, type: %s, pid: %d, error: %v",
+                       m.addr, symbol, t, pid, err))
+       } else {
+               log.Debugf("attach to the uprobe, file: %s, symbol: %s, type: 
%s, pid: %d", m.addr, symbol, t, pid)
+               m.liker.closers = append(m.liker.closers, lk)
+       }
+}
+
 func (m *Linker) HasError() error {
        return m.errors
 }
diff --git a/pkg/profiling/task/network/metrics.go 
b/pkg/profiling/task/network/metrics.go
index 191a0b8..388be01 100644
--- a/pkg/profiling/task/network/metrics.go
+++ b/pkg/profiling/task/network/metrics.go
@@ -191,6 +191,10 @@ type ProcessTraffic struct {
 
        // current connection role of local process
        ConnectionRole ConnectionRole
+       // the protocol of the connection
+       Protocol ConnectionProtocol
+       // current connection is SSL
+       IsSSL bool
 
        // remote process/address information
        RemoteIP        string
@@ -338,6 +342,10 @@ func (r *ProcessTraffic) buildBasicMeterLabels(local 
api.ProcessInterface) (Conn
        labels = r.appendRemoteAddrssInfo(labels, curRole.Revert().String(), 
local)
 
        labels = r.appendMeterValue(labels, "side", curRole.String())
+
+       // protocol and ssl
+       labels = r.appendMeterValue(labels, "protocol", r.Protocol.String())
+       labels = r.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", 
r.IsSSL))
        return curRole, labels
 }
 
diff --git a/pkg/profiling/task/network/runner.go 
b/pkg/profiling/task/network/runner.go
index 8821903..7581ade 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -96,14 +96,14 @@ func (r *Runner) Start(ctx context.Context, processes 
[]api.ProcessInterface) er
                return err
        }
        r.bpf = &objs
-       r.bpfContext.Init(&objs)
+       r.bpfContext.Init(&objs, r.linker)
 
        if err := r.bpfContext.AddProcesses(processes); err != nil {
                return err
        }
 
        // register all handlers
-       r.bpfContext.RegisterAllHandlers(r.linker)
+       r.bpfContext.RegisterAllHandlers()
        r.bpfContext.StartSocketAddressParser(r.ctx)
 
        // sock opts
@@ -238,8 +238,8 @@ func (r *Runner) logTheMetricsConnections(traffices 
[]*ProcessTraffic) {
                                traffic.RemoteIP, traffic.RemotePort, 
traffic.RemotePid)
                }
                side := traffic.ConnectionRole.String()
-               log.Debugf("connection analyze result: %s : %s -> %s, read: %d 
bytes/%d, write: %d bytes/%d",
-                       side, localInfo, remoteInfo, 
traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
+               log.Debugf("connection analyze result: %s : %s -> %s, protocol: 
%s, is SSL: %t, read: %d bytes/%d, write: %d bytes/%d",
+                       side, localInfo, remoteInfo, traffic.Protocol.String(), 
traffic.IsSSL, traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
                        traffic.ReadCounter.Bytes, traffic.ReadCounter.Count)
        }
 }
diff --git a/pkg/profiling/task/network/ssl.go 
b/pkg/profiling/task/network/ssl.go
new file mode 100644
index 0000000..a69782b
--- /dev/null
+++ b/pkg/profiling/task/network/ssl.go
@@ -0,0 +1,155 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package network
+
+import (
+       "fmt"
+       "os/exec"
+       "regexp"
+       "strconv"
+       "strings"
+
+       "github.com/apache/skywalking-rover/pkg/tools/profiling"
+
+       "github.com/apache/skywalking-rover/pkg/tools"
+
+       "github.com/apache/skywalking-rover/pkg/tools/path"
+)
+
+var openSSLVersionRegex = 
regexp.MustCompile(`^OpenSSL\s+(?P<Major>\d)\.(?P<Minor>\d)\.(?P<Fix>\d+)\w+`)
+
+type OpenSSLFdSymAddrConfigInBPF struct {
+       BIOReadOffset  uint32
+       BIOWriteOffset uint32
+       FDOffset       uint32
+}
+
+func addSSLProcess(pid int, bpf *bpfObjects, linker *Linker) error {
+       modules, err := tools.ProcessModules(int32(pid))
+       if err != nil {
+               return fmt.Errorf("read process modules error: %d, error: %v", 
pid, err)
+       }
+
+       // openssl process
+       if err1 := processOpenSSLProcess(pid, bpf, linker, modules); err1 != 
nil {
+               return err1
+       }
+
+       return nil
+}
+
+func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules 
[]*profiling.Module) error {
+       var libcryptoName, libsslName = "libcrypto.so", "libssl.so"
+       var libcryptoPath, libsslPath string
+       processModules, err := findProcessModules(modules, libcryptoName, 
libsslName)
+       if err != nil {
+               return err
+       }
+       // the openssl not exists, so ignore
+       if len(processModules) == 0 {
+               return nil
+       }
+       if libcrypto := processModules[libcryptoName]; libcrypto != nil {
+               libcryptoPath = libcrypto.Path
+       }
+       if libssl := processModules[libsslName]; libssl != nil {
+               libsslPath = libssl.Path
+       }
+       if libcryptoPath == "" || libsslPath == "" {
+               return fmt.Errorf("the OpenSSL library not complete, libcrypto: 
%s, libssl: %s", libcryptoPath, libsslPath)
+       }
+
+       // build the symbol address config and write to the bpf
+       conf, err := buildSSLSymAddrConfig(libcryptoPath)
+       if err != nil {
+               return err
+       }
+       if err := bpf.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil 
{
+               return err
+       }
+
+       // attach the linker
+       libSSLLinker := linker.OpenUProbeExeFile(libsslPath)
+       libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, 
bpf.OpensslWriteRet, pid)
+       libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet, 
pid)
+       return linker.HasError()
+}
+
+func findProcessModules(modules []*profiling.Module, moduleNames ...string) 
(map[string]*profiling.Module, error) {
+       result := make(map[string]*profiling.Module)
+       for _, mod := range modules {
+               for _, modName := range moduleNames {
+                       if strings.Contains(mod.Name, modName) {
+                               if !path.Exists(mod.Path) {
+                                       return nil, fmt.Errorf("the module path 
not exists, path: %s", mod.Path)
+                               }
+                               result[modName] = mod
+                       }
+               }
+       }
+       return result, nil
+}
+
+func buildSSLSymAddrConfig(libcryptoPath string) 
(*OpenSSLFdSymAddrConfigInBPF, error) {
+       // using "strings" command to query the symbol in the libcrypto library
+       result, err := exec.Command("strings", libcryptoPath).Output()
+       if err != nil {
+               return nil, err
+       }
+       for _, p := range strings.Split(string(result), "\n") {
+               submatch := openSSLVersionRegex.FindStringSubmatch(p)
+               if len(submatch) != 4 {
+                       continue
+               }
+               major := submatch[1]
+               mijor := submatch[2]
+               fix := submatch[3]
+
+               log.Debugf("found the libcrypto.so version: %s.%s.%s", major, 
mijor, fix)
+               conf := &OpenSSLFdSymAddrConfigInBPF{}
+
+               // must be number, already validate in the regex
+               mijorVal, _ := strconv.Atoi(mijor)
+               fixVal, _ := strconv.Atoi(fix)
+
+               // max support version is 1.1.1
+               if mijorVal > 1 || fixVal > 1 {
+                       return nil, fmt.Errorf("the fix version of the 
libcrypto is not support: %s.%s.%s", major, mijor, fix)
+               }
+
+               // bio offset
+               // 
https://github.com/openssl/openssl/blob/OpenSSL_1_0_0-stable/ssl/ssl.h#L1093-L1111
+               // 
https://github.com/openssl/openssl/blob/OpenSSL_1_1_1-stable/ssl/ssl_local.h#L1068-L1083
+               conf.BIOReadOffset = 16
+               conf.BIOWriteOffset = 24
+               // fd offset
+               if (mijorVal == 0) || (mijorVal == 1 && fixVal == 0) {
+                       // 1.0.x || 1.1.0
+                       // 
https://github.com/openssl/openssl/blob/OpenSSL_1_0_0-stable/crypto/bio/bio.h#L297-L306
+                       conf.FDOffset = 40
+               } else {
+                       // 1.1.1
+                       // 
https://github.com/openssl/openssl/blob/OpenSSL_1_1_1-stable/crypto/bio/bio_local.h#L115-L125
+                       conf.FDOffset = 48
+               }
+               log.Debugf("the lobcrypto.so library symbol verson config, 
version: %s.%s.%s, bio offset: %d",
+                       major, mijor, fix, conf.FDOffset)
+               return conf, nil
+       }
+       return nil, fmt.Errorf("could not fount the version of the 
libcrypto.so")
+}
diff --git a/pkg/tools/process.go b/pkg/tools/process.go
index b814a39..7d55855 100644
--- a/pkg/tools/process.go
+++ b/pkg/tools/process.go
@@ -27,6 +27,7 @@ import (
 
        "github.com/apache/skywalking-rover/pkg/logger"
        host2 "github.com/apache/skywalking-rover/pkg/tools/host"
+       "github.com/apache/skywalking-rover/pkg/tools/path"
        "github.com/apache/skywalking-rover/pkg/tools/profiling"
 )
 
@@ -82,6 +83,16 @@ func ProcessProfilingStat(pid int32, exePath string) 
(*profiling.Info, error) {
        return analyzeProfilingInfo(context, pid)
 }
 
+// ProcessModules Read the profiling info of the process, without the symbol 
check
+func ProcessModules(pid int32) ([]*profiling.Module, error) {
+       context := newAnalyzeContext()
+       info, err := analyzeProfilingInfo(context, pid)
+       if err != nil {
+               return nil, err
+       }
+       return info.Modules, nil
+}
+
 func analyzeProfilingInfo(context *analyzeContext, pid int32) 
(*profiling.Info, error) {
        // analyze process mapping
        mapFile, _ := os.Open(host2.GetFileInHost(fmt.Sprintf("/proc/%d/maps", 
pid)))
@@ -116,6 +127,10 @@ func analyzeProfilingInfo(context *analyzeContext, pid 
int32) (*profiling.Info,
                        continue
                }
                modulePath := 
host2.GetFileInHost(fmt.Sprintf("/proc/%d/root%s", pid, moduleName))
+               if !path.Exists(modulePath) {
+                       log.Debugf("could not found the module, ignore. name: 
%s, path: %s", moduleName, modulePath)
+                       continue
+               }
 
                module, err = context.GetFinder(modulePath).ToModule(pid, 
moduleName, modulePath, []*profiling.ModuleRange{moduleRange})
                if err != nil {
@@ -149,6 +164,7 @@ func isIgnoreModuleName(name string) bool {
                        strings.HasPrefix(name, "/memfd:") ||
                        strings.HasPrefix(name, "[vdso]") ||
                        strings.HasPrefix(name, "[vsyscall]") ||
+                       strings.HasPrefix(name, "[uprobes]") ||
                        strings.HasSuffix(name, ".map"))
 }
 
diff --git a/pkg/tools/profiling/go_library.go 
b/pkg/tools/profiling/go_library.go
index 9883672..2257b2a 100644
--- a/pkg/tools/profiling/go_library.go
+++ b/pkg/tools/profiling/go_library.go
@@ -23,8 +23,6 @@ import (
        "sort"
        "strings"
 
-       "github.com/hashicorp/go-multierror"
-
        "github.com/apache/skywalking-rover/pkg/tools/path"
 )
 
@@ -54,11 +52,10 @@ func (l *GoLibrary) AnalyzeSymbols(filePath string) 
([]*Symbol, error) {
        defer file.Close()
 
        // exist symbol data
-       symbols, symError := file.Symbols()
-       dySyms, dyError := file.DynamicSymbols()
+       symbols, _ := file.Symbols()
+       dySyms, _ := file.DynamicSymbols()
        if len(symbols) == 0 && len(dySyms) == 0 {
-               symError = multierror.Append(symError, dyError)
-               return nil, symError
+               return nil, nil
        }
        symbols = append(symbols, dySyms...)
 

Reply via email to