This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch ztunnel-target
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit 82ece5e3130617479b51cc001ea9e52b072d4a26
Author: mrproliu <[email protected]>
AuthorDate: Mon May 12 15:41:51 2025 +0800

    Support detect the targeting address in the access log module
---
 bpf/accesslog/ambient/ztunnel.c    | 28 +++++++++++++++++++++----
 go.mod                             |  2 +-
 go.sum                             |  4 ++--
 pkg/accesslog/collector/ztunnel.go | 42 ++++++++++++++++++++++++++++++--------
 4 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/bpf/accesslog/ambient/ztunnel.c b/bpf/accesslog/ambient/ztunnel.c
index e1fc44a..0361b4b 100644
--- a/bpf/accesslog/ambient/ztunnel.c
+++ b/bpf/accesslog/ambient/ztunnel.c
@@ -17,7 +17,7 @@
 
 #include "ztunnel.h"
 
-static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, 
__u32 *ip, __u16 *port) {
+static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, 
__u32 *ip, __u16 *port, int must_exist) {
     if (!success) {
         return false;
     }
@@ -25,6 +25,9 @@ static __inline bool get_socket_addr_ip_in_ztunnel(bool 
success, void * arg, __u
     if (bpf_probe_read(&sockaddr, sizeof(sockaddr), (void *)arg) != 0) {
        return false;
     }
+    if (must_exist == 1 && sockaddr[0] != 0) {
+        return false;
+    }
     // ip is stored in sockaddr[2], sockaddr[3], sockaddr[4], sockaddr[5]
     *ip = ((__u32)sockaddr[2] << 24) | ((__u32)sockaddr[3] << 16) | 
((__u32)sockaddr[4] << 8) | (__u32)sockaddr[5];
     if (port != NULL) {
@@ -41,9 +44,26 @@ int connection_manager_track_outbound(struct pt_regs* ctx) {
         return 0;
     }
     bool success = true;
-    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM3(ctx), &event->orginal_src_ip, &event->src_port);
-    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM4(ctx), &event->original_dst_ip, &event->dst_port);
-    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM5(ctx), &event->lb_dst_ip, &event->lb_dst_port);
+    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM3(ctx), &event->orginal_src_ip, &event->src_port, 0);
+    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM4(ctx), &event->original_dst_ip, &event->dst_port, 0);
+    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM5(ctx), &event->lb_dst_ip, &event->lb_dst_port, 0);
+    if (!success) {
+        return 0;
+    }
+    bpf_perf_event_output(ctx, &ztunnel_lb_socket_mapping_event_queue, 
BPF_F_CURRENT_CPU, event, sizeof(*event));
+    return 0;
+}
+
+SEC("uprobe/connection_result_new")
+int connection_result_new(struct pt_regs* ctx) {
+    struct ztunnel_socket_mapping_t *event = 
create_ztunnel_socket_mapping_event();
+    if (event == NULL) {
+        return 0;
+    }
+    bool success = true;
+    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM2(ctx), &event->orginal_src_ip, &event->src_port, 0);
+    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM3(ctx), &event->original_dst_ip, &event->dst_port, 0);
+    success = get_socket_addr_ip_in_ztunnel(success, (void 
*)PT_REGS_PARM4(ctx), &event->lb_dst_ip, &event->lb_dst_port, 1);
     if (!success) {
         return 0;
     }
diff --git a/go.mod b/go.mod
index 6eb8649..70d5b57 100644
--- a/go.mod
+++ b/go.mod
@@ -26,7 +26,7 @@ require (
        k8s.io/apimachinery v0.23.5
        k8s.io/client-go v0.23.5
        k8s.io/utils v0.0.0-20211116205334-6203023598ed
-       skywalking.apache.org/repo/goapi v0.0.0-20250225130248-3916480eb467
+       skywalking.apache.org/repo/goapi v0.0.0-20250512041444-47b8e4229d27
 )
 
 require (
diff --git a/go.sum b/go.sum
index b1dd69b..281dae0 100644
--- a/go.sum
+++ b/go.sum
@@ -1059,5 +1059,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 
h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
 sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod 
h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20250225130248-3916480eb467 
h1:pXT6UxmC3qAD8faGFYuWmz6ekQPRfW1PcU8lAu6WSB0=
-skywalking.apache.org/repo/goapi v0.0.0-20250225130248-3916480eb467/go.mod 
h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
+skywalking.apache.org/repo/goapi v0.0.0-20250512041444-47b8e4229d27 
h1:qQqddsnVGf6byf4uPSVNx9wRzye+gKrEdUVZvdJtBgM=
+skywalking.apache.org/repo/goapi v0.0.0-20250512041444-47b8e4229d27/go.mod 
h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
diff --git a/pkg/accesslog/collector/ztunnel.go 
b/pkg/accesslog/collector/ztunnel.go
index f6a02ae..26a570b 100644
--- a/pkg/accesslog/collector/ztunnel.go
+++ b/pkg/accesslog/collector/ztunnel.go
@@ -42,6 +42,8 @@ var (
        // ZTunnelTrackBoundSymbolPrefix is the prefix of the symbol name to 
track outbound connections in ztunnel process
        // ztunnel::proxy::connection_manager::ConnectionManager::track_outbound
        ZTunnelTrackBoundSymbolPrefix = 
"_ZN7ztunnel5proxy18connection_manager17ConnectionManager14track_outbound"
+       // ztunnel::proxy::metrics::ConnectionResult::new
+       ZTunnelTrackConnectionResultSymbolPrefix = 
"_ZN7ztunnel5proxy7metrics16ConnectionResult3new"
 )
 
 var zTunnelCollectInstance = NewZTunnelCollector(time.Minute)
@@ -81,7 +83,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx 
*common.AccessLogConte
                remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
                remotePort := event.OriginalDestPort
                lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP)
-               log.Debugf("received ztunnel lb socket mapping event: %s:%d -> 
%s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP)
+               log.Debugf("received ztunnel lb socket mapping event: %s:%d -> 
%s:%d, lb: %s:%d", localIP, localPort, remoteIP, remotePort, lbIP,
+                       event.LoadBalancedDestPort)
 
                key := z.buildIPMappingCacheKey(localIP, int(localPort), 
remoteIP, int(remotePort))
                z.ipMappingCache.Set(key, &ZTunnelLoadBalanceAddress{
@@ -122,20 +125,36 @@ func (z *ZTunnelCollector) 
ReadyToFlushConnection(connection *common.ConnectionI
                        connection.ConnectionID, connection.RandomID)
                return
        }
-       address := lbIPObj.(*ZTunnelLoadBalanceAddress)
-       log.Debugf("found the ztunnel load balanced IP for the connection: %s, 
connectionID: %d, randomID: %d",
-               address.String(), connection.ConnectionID, connection.RandomID)
+       lbAddress := lbIPObj.(*ZTunnelLoadBalanceAddress)
+       // found the real target if exist
+       key = z.buildIPMappingCacheKey(connection.Socket.SrcIP, 
int(connection.Socket.SrcPort),
+               lbAddress.IP, int(lbAddress.Port))
+       realIPObj, found := z.ipMappingCache.Get(key)
+       var realAddress *ZTunnelLoadBalanceAddress = nil
+       if !found {
+               log.Debugf("there no real ztunnel mapped IP address found for 
connection ID: %d, random ID: %d, lbIP: %s:%d",
+                       connection.ConnectionID, connection.RandomID, 
lbAddress.IP, lbAddress.Port)
+       } else {
+               realAddress = realIPObj.(*ZTunnelLoadBalanceAddress)
+       }
+       log.Debugf("found the ztunnel load balanced IP for the connection: %s, 
real IP: %s, connectionID: %d, randomID: %d",
+               lbAddress.String(), realAddress, connection.ConnectionID, 
connection.RandomID)
        securityPolicy := v3.ZTunnelAttachmentSecurityPolicy_NONE
        // if the target port is 15008, this mean ztunnel have use mTLS
-       if address.Port == 15008 {
+       if lbAddress.Port == 15008 {
                securityPolicy = v3.ZTunnelAttachmentSecurityPolicy_MTLS
        }
+       var targetIP = ""
+       if realAddress != nil {
+               targetIP = realAddress.IP
+       }
        connection.RPCConnection.Attachment = &v3.ConnectionAttachment{
                Environment: &v3.ConnectionAttachment_ZTunnel{
                        ZTunnel: &v3.ZTunnelAttachmentEnvironment{
-                               RealDestinationIp: address.IP,
-                               By:                
v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC,
-                               SecurityPolicy:    securityPolicy,
+                               RealDestinationIp:   lbAddress.IP,
+                               By:                  
v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC,
+                               SecurityPolicy:      securityPolicy,
+                               TargetDestinationIp: targetIP,
                        },
                },
        }
@@ -203,9 +222,16 @@ func (z *ZTunnelCollector) collectZTunnelProcess(p 
*process.Process) error {
        if len(trackBoundSymbol) == 0 {
                return fmt.Errorf("failed to find track outbound symbol in 
ztunnel process")
        }
+       trackConnectionResultSymbol := elfFile.FilterSymbol(func(name string) 
bool {
+               return strings.HasPrefix(name, 
ZTunnelTrackConnectionResultSymbolPrefix)
+       }, true)
+       if len(trackConnectionResultSymbol) == 0 {
+               return fmt.Errorf("failed to find track connection result 
symbol in ztunnel process")
+       }
 
        uprobeFile := z.alc.BPF.OpenUProbeExeFile(pidExeFile)
        uprobeFile.AddLink(trackBoundSymbol[0].Name, 
z.alc.BPF.ConnectionManagerTrackOutbound, nil)
+       uprobeFile.AddLink(trackConnectionResultSymbol[0].Name, 
z.alc.BPF.ConnectionResultNew, nil)
        return nil
 }
 

Reply via email to