This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch ztunnel-target in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit 82ece5e3130617479b51cc001ea9e52b072d4a26 Author: mrproliu <[email protected]> AuthorDate: Mon May 12 15:41:51 2025 +0800 Support detect the targeting address in the access log module --- bpf/accesslog/ambient/ztunnel.c | 28 +++++++++++++++++++++---- go.mod | 2 +- go.sum | 4 ++-- pkg/accesslog/collector/ztunnel.go | 42 ++++++++++++++++++++++++++++++-------- 4 files changed, 61 insertions(+), 15 deletions(-) diff --git a/bpf/accesslog/ambient/ztunnel.c b/bpf/accesslog/ambient/ztunnel.c index e1fc44a..0361b4b 100644 --- a/bpf/accesslog/ambient/ztunnel.c +++ b/bpf/accesslog/ambient/ztunnel.c @@ -17,7 +17,7 @@ #include "ztunnel.h" -static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, __u32 *ip, __u16 *port) { +static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, __u32 *ip, __u16 *port, int must_exist) { if (!success) { return false; } @@ -25,6 +25,9 @@ static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, __u if (bpf_probe_read(&sockaddr, sizeof(sockaddr), (void *)arg) != 0) { return false; } + if (must_exist == 1 && sockaddr[0] != 0) { + return false; + } // ip is stored in sockaddr[2], sockaddr[3], sockaddr[4], sockaddr[5] *ip = ((__u32)sockaddr[2] << 24) | ((__u32)sockaddr[3] << 16) | ((__u32)sockaddr[4] << 8) | (__u32)sockaddr[5]; if (port != NULL) { @@ -41,9 +44,26 @@ int connection_manager_track_outbound(struct pt_regs* ctx) { return 0; } bool success = true; - success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM3(ctx), &event->orginal_src_ip, &event->src_port); - success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM4(ctx), &event->original_dst_ip, &event->dst_port); - success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM5(ctx), &event->lb_dst_ip, &event->lb_dst_port); + success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM3(ctx), &event->orginal_src_ip, &event->src_port, 0); + success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM4(ctx), &event->original_dst_ip, &event->dst_port, 0); + success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM5(ctx), &event->lb_dst_ip, &event->lb_dst_port, 0); + if (!success) { + return 0; + } + bpf_perf_event_output(ctx, &ztunnel_lb_socket_mapping_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event)); + return 0; +} + +SEC("uprobe/connection_result_new") +int connection_result_new(struct pt_regs* ctx) { + struct ztunnel_socket_mapping_t *event = create_ztunnel_socket_mapping_event(); + if (event == NULL) { + return 0; + } + bool success = true; + success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM2(ctx), &event->orginal_src_ip, &event->src_port, 0); + success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM3(ctx), &event->original_dst_ip, &event->dst_port, 0); + success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM4(ctx), &event->lb_dst_ip, &event->lb_dst_port, 1); if (!success) { return 0; } diff --git a/go.mod b/go.mod index 6eb8649..70d5b57 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( k8s.io/apimachinery v0.23.5 k8s.io/client-go v0.23.5 k8s.io/utils v0.0.0-20211116205334-6203023598ed - skywalking.apache.org/repo/goapi v0.0.0-20250225130248-3916480eb467 + skywalking.apache.org/repo/goapi v0.0.0-20250512041444-47b8e4229d27 ) require ( diff --git a/go.sum b/go.sum index b1dd69b..281dae0 100644 --- a/go.sum +++ b/go.sum @@ -1059,5 +1059,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= -skywalking.apache.org/repo/goapi v0.0.0-20250225130248-3916480eb467 h1:pXT6UxmC3qAD8faGFYuWmz6ekQPRfW1PcU8lAu6WSB0= -skywalking.apache.org/repo/goapi v0.0.0-20250225130248-3916480eb467/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE= +skywalking.apache.org/repo/goapi v0.0.0-20250512041444-47b8e4229d27 h1:qQqddsnVGf6byf4uPSVNx9wRzye+gKrEdUVZvdJtBgM= +skywalking.apache.org/repo/goapi v0.0.0-20250512041444-47b8e4229d27/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE= diff --git a/pkg/accesslog/collector/ztunnel.go b/pkg/accesslog/collector/ztunnel.go index f6a02ae..26a570b 100644 --- a/pkg/accesslog/collector/ztunnel.go +++ b/pkg/accesslog/collector/ztunnel.go @@ -42,6 +42,8 @@ var ( // ZTunnelTrackBoundSymbolPrefix is the prefix of the symbol name to track outbound connections in ztunnel process // ztunnel::proxy::connection_manager::ConnectionManager::track_outbound ZTunnelTrackBoundSymbolPrefix = "_ZN7ztunnel5proxy18connection_manager17ConnectionManager14track_outbound" + // ztunnel::proxy::metrics::ConnectionResult::new + ZTunnelTrackConnectionResultSymbolPrefix = "_ZN7ztunnel5proxy7metrics16ConnectionResult3new" ) var zTunnelCollectInstance = NewZTunnelCollector(time.Minute) @@ -81,7 +83,8 @@ func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogConte remoteIP := z.convertBPFIPToString(event.OriginalDestIP) remotePort := event.OriginalDestPort lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP) - log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP) + log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s:%d", localIP, localPort, remoteIP, remotePort, lbIP, + event.LoadBalancedDestPort) key := z.buildIPMappingCacheKey(localIP, int(localPort), remoteIP, int(remotePort)) z.ipMappingCache.Set(key, &ZTunnelLoadBalanceAddress{ @@ -122,20 +125,36 @@ func (z *ZTunnelCollector) ReadyToFlushConnection(connection *common.ConnectionI connection.ConnectionID, connection.RandomID) return } - address := lbIPObj.(*ZTunnelLoadBalanceAddress) - log.Debugf("found the ztunnel load balanced IP for the connection: %s, connectionID: %d, randomID: %d", - address.String(), connection.ConnectionID, connection.RandomID) + lbAddress := lbIPObj.(*ZTunnelLoadBalanceAddress) + // found the real target if exist + key = z.buildIPMappingCacheKey(connection.Socket.SrcIP, int(connection.Socket.SrcPort), + lbAddress.IP, int(lbAddress.Port)) + realIPObj, found := z.ipMappingCache.Get(key) + var realAddress *ZTunnelLoadBalanceAddress = nil + if !found { + log.Debugf("there no real ztunnel mapped IP address found for connection ID: %d, random ID: %d, lbIP: %s:%d", + connection.ConnectionID, connection.RandomID, lbAddress.IP, lbAddress.Port) + } else { + realAddress = realIPObj.(*ZTunnelLoadBalanceAddress) + } + log.Debugf("found the ztunnel load balanced IP for the connection: %s, real IP: %s, connectionID: %d, randomID: %d", + lbAddress.String(), realAddress, connection.ConnectionID, connection.RandomID) securityPolicy := v3.ZTunnelAttachmentSecurityPolicy_NONE // if the target port is 15008, this mean ztunnel have use mTLS - if address.Port == 15008 { + if lbAddress.Port == 15008 { securityPolicy = v3.ZTunnelAttachmentSecurityPolicy_MTLS } + var targetIP = "" + if realAddress != nil { + targetIP = realAddress.IP + } connection.RPCConnection.Attachment = &v3.ConnectionAttachment{ Environment: &v3.ConnectionAttachment_ZTunnel{ ZTunnel: &v3.ZTunnelAttachmentEnvironment{ - RealDestinationIp: address.IP, - By: v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC, - SecurityPolicy: securityPolicy, + RealDestinationIp: lbAddress.IP, + By: v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC, + SecurityPolicy: securityPolicy, + TargetDestinationIp: targetIP, }, }, } @@ -203,9 +222,16 @@ func (z *ZTunnelCollector) collectZTunnelProcess(p *process.Process) error { if len(trackBoundSymbol) == 0 { return fmt.Errorf("failed to find track outbound symbol in ztunnel process") } + trackConnectionResultSymbol := elfFile.FilterSymbol(func(name string) bool { + return strings.HasPrefix(name, ZTunnelTrackConnectionResultSymbolPrefix) + }, true) + if len(trackConnectionResultSymbol) == 0 { + return fmt.Errorf("failed to find track connection result symbol in ztunnel process") + } uprobeFile := z.alc.BPF.OpenUProbeExeFile(pidExeFile) uprobeFile.AddLink(trackBoundSymbol[0].Name, z.alc.BPF.ConnectionManagerTrackOutbound, nil) + uprobeFile.AddLink(trackConnectionResultSymbol[0].Name, z.alc.BPF.ConnectionResultNew, nil) return nil }
