This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch reduce-handle-connect-time in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit 7d21644a1cf1b3665129bf6a48d869552e81b6cf Author: mrproliu <[email protected]> AuthorDate: Fri Dec 27 13:07:08 2024 +0800 Reduce handle connect event time in the access log module --- CHANGES.md | 1 + bpf/accesslog/common/connection.h | 3 +- bpf/accesslog/common/data_args.h | 6 +- bpf/accesslog/syscalls/connect_conntrack.c | 5 +- bpf/accesslog/syscalls/connect_conntrack.h | 2 +- go.mod | 4 +- go.sum | 11 +- pkg/accesslog/collector/connection.go | 79 ++++---- pkg/accesslog/collector/protocols/connection.go | 1 - pkg/accesslog/collector/protocols/queue.go | 33 +--- pkg/accesslog/common/connection.go | 214 +++++---------------- pkg/accesslog/common/filter.go | 5 + pkg/accesslog/events/close.go | 4 +- pkg/accesslog/events/connect.go | 4 +- pkg/accesslog/events/data.go | 8 +- pkg/accesslog/events/detail.go | 4 +- pkg/accesslog/events/events_test.go | 16 +- pkg/accesslog/events/ztunnel.go | 6 +- pkg/process/api.go | 2 + pkg/process/finders/kubernetes/finder.go | 58 +++++- pkg/process/finders/manager.go | 9 + pkg/process/module.go | 9 + pkg/profiling/task/network/analyze/events/data.go | 4 + .../task/network/analyze/layer7/events.go | 4 +- pkg/tools/btf/linker.go | 63 ++++-- pkg/tools/btf/queue.go | 74 ------- pkg/tools/btf/{reader => }/reader.go | 75 ++++++-- pkg/tools/buffer/buffer.go | 85 ++++---- pkg/tools/ip.go | 24 ++- pkg/tools/ip/conntrack.go | 17 +- pkg/tools/ip/tcpresolver.go | 2 + 31 files changed, 399 insertions(+), 433 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3d54791..16fc6fd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,6 +21,7 @@ Release Notes. * Add warning log when the event queue almost full in the access log module. * Reduce unessential `conntrack` query when detect new connection. * Reduce CPU and memory usage in the access log module. +* Reduce handle connection event time in the access log module. #### Bug Fixes * Fix the base image cannot run in the arm64. diff --git a/bpf/accesslog/common/connection.h b/bpf/accesslog/common/connection.h index 880826d..834737a 100644 --- a/bpf/accesslog/common/connection.h +++ b/bpf/accesslog/common/connection.h @@ -27,7 +27,7 @@ // syscall:connect struct connect_args_t { __u32 fd; - __u32 fix; + __u32 has_remote; struct sockaddr* addr; struct sock *sock; __u64 start_nacs; @@ -305,6 +305,7 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) { bpf_map_delete_elem(&socket_data_last_id_map, &conid); + bpf_map_delete_elem(&socket_data_id_generate_map, &conid); struct socket_close_event_t *close_event; close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event)); if (close_event == NULL) { diff --git a/bpf/accesslog/common/data_args.h b/bpf/accesslog/common/data_args.h index 2d76062..33eeea6 100644 --- a/bpf/accesslog/common/data_args.h +++ b/bpf/accesslog/common/data_args.h @@ -66,14 +66,14 @@ struct sock_data_args_t { }; struct { __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, 10000); + __uint(max_entries, 65535); __type(key, __u64); __type(value, struct sock_data_args_t); } socket_data_args SEC(".maps"); struct { - __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, 10000); + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, 65535); __type(key, __u64); __type(value, __u64); } socket_data_id_generate_map SEC(".maps"); diff --git a/bpf/accesslog/syscalls/connect_conntrack.c b/bpf/accesslog/syscalls/connect_conntrack.c index 4efac48..8f1f5cb 100644 --- a/bpf/accesslog/syscalls/connect_conntrack.c +++ b/bpf/accesslog/syscalls/connect_conntrack.c @@ -93,7 +93,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct } // already contains the remote address - if (&(connect_args->remote) != NULL) { + if (connect_args->has_remote && &(connect_args->remote) != NULL) { return 0; } @@ -126,6 +126,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct remote.ipl = reply_conn.saddr_l; remote.port = reply_conn.sport; connect_args->remote = remote; + connect_args->has_remote = 1; bpf_map_update_elem(&conecting_args, &id, connect_args, 0); return 0; @@ -144,4 +145,4 @@ int nf_confirm(struct pt_regs* ctx) { SEC("kprobe/ctnetlink_fill_info") int nf_ctnetlink_fill_info(struct pt_regs* ctx) { return nf_conn_aware(ctx, (struct nf_conn*)PT_REGS_PARM5(ctx)); -} +} \ No newline at end of file diff --git a/bpf/accesslog/syscalls/connect_conntrack.h b/bpf/accesslog/syscalls/connect_conntrack.h index 0e67b9c..06e5072 100644 --- a/bpf/accesslog/syscalls/connect_conntrack.h +++ b/bpf/accesslog/syscalls/connect_conntrack.h @@ -96,4 +96,4 @@ struct nf_conn { struct nf_conntrack_tuple_hash tuplehash[IP_CT_DIR_MAX]; long unsigned int status; __u32 mark; -} __attribute__((preserve_access_index)); +} __attribute__((preserve_access_index)); \ No newline at end of file diff --git a/go.mod b/go.mod index 7139cf3..bb05af7 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.3.0 github.com/spf13/viper v1.10.1 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.8.4 github.com/zekroTJA/timedmap v1.4.0 golang.org/x/arch v0.0.0-20220722155209-00200b7164a7 golang.org/x/net v0.32.0 @@ -46,7 +46,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mdlayher/netlink v1.7.2 // indirect - github.com/mdlayher/socket v0.4.1 // indirect + github.com/mdlayher/socket v0.5.1 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect diff --git a/go.sum b/go.sum index 1cc3adf..e15c3cc 100644 --- a/go.sum +++ b/go.sum @@ -369,8 +369,8 @@ github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/XV68LkQKYzKhIo/WW7j3Zi0YRAz/BOoanUc= github.com/mdlayher/socket v0.0.0-20211007213009-516dcbdf0267/go.mod h1:nFZ1EtZYK8Gi/k6QNu7z7CgO20i/4ExeQswwWuPmG/g= github.com/mdlayher/socket v0.1.0/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs= -github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U= -github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= +github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos= +github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= @@ -467,7 +467,6 @@ github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8q github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -476,10 +475,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= diff --git a/pkg/accesslog/collector/connection.go b/pkg/accesslog/collector/connection.go index 4fc6823..b7930e2 100644 --- a/pkg/accesslog/collector/connection.go +++ b/pkg/accesslog/collector/connection.go @@ -33,6 +33,8 @@ import ( "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" + "github.com/apache/skywalking-rover/pkg/process" + "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/tools" "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/enums" @@ -56,7 +58,7 @@ func NewConnectionCollector() *ConnectCollector { return &ConnectCollector{} } -func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext) error { +func (c *ConnectCollector) Start(m *module.Manager, ctx *common.AccessLogContext) error { perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize) if err != nil { return err @@ -73,13 +75,9 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext if ctx.Config.ConnectionAnalyze.QueueSize < 1 { return fmt.Errorf("the queue size be small than 1") } - track, err := ip.NewConnTrack() - if err != nil { - connectionLogger.Warnf("cannot create the connection tracker, %v", err) - } c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { - return newConnectionPartitionContext(ctx, track) + return NewConnectionPartitionContext(ctx, m.FindModule(process.ModuleName).(process.K8sOperator)) }) c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { @@ -130,18 +128,18 @@ func (c *ConnectCollector) Stop() { type ConnectionPartitionContext struct { context *common.AccessLogContext - connTracker *ip.ConnTrack + k8sOperator process.K8sOperator } -func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker *ip.ConnTrack) *ConnectionPartitionContext { +func NewConnectionPartitionContext(ctx *common.AccessLogContext, + k8sOperator process.K8sOperator) *ConnectionPartitionContext { return &ConnectionPartitionContext{ context: ctx, - connTracker: connTracker, + k8sOperator: k8sOperator, } } func (c *ConnectionPartitionContext) Start(ctx context.Context) { - } func (c *ConnectionPartitionContext) Consume(data interface{}) { @@ -151,7 +149,7 @@ func (c *ConnectionPartitionContext) Consume(data interface{}) { "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t", event.ConID, event.RandomID, event.PID, event.SocketFD, enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName), event.SocketFamily, event.ConnectSuccess, event.ConnTrackUpstreamPort != 0) - socketPair := c.buildSocketFromConnectEvent(event) + socketPair := c.BuildSocketFromConnectEvent(event) if socketPair == nil { connectionLogger.Debugf("cannot found the socket paire from connect event, connection ID: %d, randomID: %d", event.ConID, event.RandomID) @@ -159,7 +157,6 @@ func (c *ConnectionPartitionContext) Consume(data interface{}) { } connectionLogger.Debugf("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d", event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort) - c.context.ConnectionMgr.OnConnectEvent(event, socketPair) forwarder.SendConnectEvent(c.context, event, socketPair) case *events.SocketCloseEvent: connectionLogger.Debugf("receive close event, connection ID: %d, randomID: %d, pid: %d, fd: %d", @@ -169,7 +166,7 @@ func (c *ConnectionPartitionContext) Consume(data interface{}) { } } -func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) { +func (c *ConnectionPartitionContext) FixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) { if result == nil { return } @@ -189,39 +186,39 @@ func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event *events.SocketC } } -func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair { +func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair { if event.SocketFamily != unix.AF_INET && event.SocketFamily != unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown { // if not ipv4, ipv6 or unknown, ignore return nil } - socketPair := c.buildSocketPair(event) - if socketPair != nil && socketPair.IsValid() { + pair := c.BuildSocketPair(event) + if pair != nil && pair.IsValid() { connectionLogger.Debugf("found the connection from the connect event is valid, connection ID: %d, randomID: %d", event.ConID, event.RandomID) - return socketPair + return pair } // if only the local port not success, maybe the upstream port is not open, so it could be continued - if c.isOnlyLocalPortEmpty(socketPair) { + if c.IsOnlyLocalPortEmpty(pair) { event.ConnectSuccess = 0 connectionLogger.Debugf("the connection from the connect event is only the local port is empty, connection ID: %d, randomID: %d", event.ConID, event.RandomID) - return socketPair + return pair } pair, err := ip.ParseSocket(event.PID, event.SocketFD) if err != nil { - connectionLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d", event.PID, event.SocketFD) + connectionLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d, error: %v", event.PID, event.SocketFD, err) return nil } connectionLogger.Debugf("found the connection from the socket, connection ID: %d, randomID: %d", event.ConID, event.RandomID) pair.Role = enums.ConnectionRole(event.Role) - c.fixSocketFamilyIfNeed(event, pair) - c.tryToUpdateSocketFromConntrack(event, pair) + c.FixSocketFamilyIfNeed(event, pair) + c.CheckNeedConntrack(event, pair) return pair } -func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool { +func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool { if socketPair == nil { return false } @@ -233,7 +230,7 @@ func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair *ip.SocketP return socketPair.IsValid() } -func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair { +func (c *ConnectionPartitionContext) BuildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair { var result *ip.SocketPair haveConnTrack := false if event.SocketFamily == unix.AF_INET { @@ -288,22 +285,28 @@ func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnect return result } - c.fixSocketFamilyIfNeed(event, result) - c.tryToUpdateSocketFromConntrack(event, result) + c.FixSocketFamilyIfNeed(event, result) + c.CheckNeedConntrack(event, result) return result } -func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) { - if socket != nil && socket.IsValid() && c.connTracker != nil && !tools.IsLocalHostAddress(socket.DestIP) && - event.FuncName != enums.SocketFunctionNameAccept { // accept event don't need to update the remote address - // if no contract and socket data is valid, then trying to get the remote address from the socket - // to encase the remote address is not the real remote address - originalIP := socket.DestIP - originalPort := socket.DestPort - if c.connTracker.UpdateRealPeerAddress(socket) { - connectionLogger.Debugf("update the socket address from conntrack success, "+ - "connection ID: %d, randomID: %d, original remote: %s:%d, new remote: %s:%d", - event.ConID, event.RandomID, originalIP, originalPort, socket.DestIP, socket.DestPort) - } +func (c *ConnectionPartitionContext) CheckNeedConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) { + if socket == nil || !socket.IsValid() || tools.IsLocalHostAddress(socket.DestIP) || + event.FuncName == enums.SocketFunctionNameAccept || // accept event don't need to update the remote address + !c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes) { // only the k8s process need to update the remote address from conntrack + return + } + + isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP) + if err != nil { + connectionLogger.Warnf("cannot found the pod IP, connection ID: %d, randomID: %d, error: %v", + event.ConID, event.RandomID, err) + } + if isPodIP { + connectionLogger.Debugf("detect the remote IP is pod IP, connection ID: %d, randomID: %d, remote: %s", + event.ConID, event.RandomID, socket.DestIP) + return } + // update to the socket need to update the remote address from conntrack + socket.NeedConnTrack = true } diff --git a/pkg/accesslog/collector/protocols/connection.go b/pkg/accesslog/collector/protocols/connection.go index b6a9e24..66d0ce9 100644 --- a/pkg/accesslog/collector/protocols/connection.go +++ b/pkg/accesslog/collector/protocols/connection.go @@ -34,7 +34,6 @@ type PartitionConnection struct { protocolAnalyzer map[enums.ConnectionProtocol]Protocol protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics closed bool - closeCallback common.ConnectionProcessFinishCallback skipAllDataAnalyze bool lastCheckCloseTime time.Time } diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index a4577f7..b336f9d 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -102,7 +102,7 @@ func (q *AnalyzeQueue) Start(ctx context.Context) { }) q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, int(q.perCPUBuffer), q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { - return &events.SocketDataUploadEvent{} + return &events.SocketDataUploadEvent{Buffer: *buffer.BorrowNewBuffer()} }, func(data interface{}) int { return int(data.(*events.SocketDataUploadEvent).ConnectionID) }) @@ -166,20 +166,6 @@ func NewPartitionContext(ctx *common.AccessLogContext, num int, protocols []Prot return pc } -func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent, closeCallback common.ConnectionProcessFinishCallback) { - conn, exist := p.connections.Get(p.buildConnectionKey(event.GetConnectionID(), event.GetRandomID())) - if !exist { - log.Debugf("connection is not exist in the partion context, connection ID: %d, random ID: %d, partition number: %d", - event.GetConnectionID(), event.GetRandomID(), p.partitionNum) - closeCallback() - return - } - connection := conn.(*PartitionConnection) - connection.closeCallback = closeCallback - log.Debugf("receive the connection close event and mark is closable, connection ID: %d, random ID: %d, partition number: %d", - event.GetConnectionID(), event.GetRandomID(), p.partitionNum) -} - func (p *PartitionContext) Start(ctx context.Context) { // process events with interval flushDuration, _ := time.ParseDuration(p.context.Config.Flush.Period) @@ -189,7 +175,7 @@ func (p *PartitionContext) Start(ctx context.Context) { select { case <-timeTicker.C: // process event with interval - p.processEvents() + p.ProcessEvents() case <-ctx.Done(): timeTicker.Stop() return @@ -203,7 +189,7 @@ func (p *PartitionContext) Start(ctx context.Context) { for { select { case <-expireTicker.C: - p.processExpireEvents() + p.ProcessExpireEvents() case <-ctx.Done(): expireTicker.Stop() return @@ -225,19 +211,19 @@ func (p *PartitionContext) Consume(data interface{}) { forwarder.SendTransferNoProtocolEvent(p.context, event) return } - connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID()) + connection := p.GetConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID()) connection.AppendDetail(p.context, event) case *events.SocketDataUploadEvent: pid, _ := events.ParseConnectionID(event.ConnectionID) log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, prev data id: %d, "+ "data id: %d, sequence: %d, protocol: %d", event.ConnectionID, event.RandomID, pid, event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0) - connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0) + connection := p.GetConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0) connection.AppendData(event) } } -func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, +func (p *PartitionContext) GetConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection { conKey := p.buildConnectionKey(connectionID, randomID) conn, exist := p.connections.Get(conKey) @@ -259,7 +245,7 @@ func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string { return string(buf) } -func (p *PartitionContext) processEvents() { +func (p *PartitionContext) ProcessEvents() { // it could be triggered by interval or reach counter // if any trigger bean locked, the other one just ignore process if !p.analyzeLocker.TryLock() { @@ -284,9 +270,6 @@ func (p *PartitionContext) processEvents() { p.checkTheConnectionIsAlreadyClose(info) } if info.closed { - if info.closeCallback != nil { - info.closeCallback() - } closedConnections = append(closedConnections, conKey) log.Debugf("detect the connection is already closed, then notify to the callback, connection ID: %d, random ID: %d, partition number: %d", info.connectionID, info.randomID, p.partitionNum) @@ -320,7 +303,7 @@ func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con *PartitionConnec } } -func (p *PartitionContext) processExpireEvents() { +func (p *PartitionContext) ProcessExpireEvents() { // the expiry must be mutual exclusion with events processor p.analyzeLocker.Lock() defer p.analyzeLocker.Unlock() diff --git a/pkg/accesslog/common/connection.go b/pkg/accesslog/common/connection.go index afac079..1c0a86e 100644 --- a/pkg/accesslog/common/connection.go +++ b/pkg/accesslog/common/connection.go @@ -42,16 +42,11 @@ import ( cmap "github.com/orcaman/concurrent-map" - "k8s.io/apimachinery/pkg/util/cache" - v32 "skywalking.apache.org/repo/goapi/collect/common/v3" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) const ( - // only using to match the remote IP address - localAddressPairCacheTime = time.Second * 15 - // clean the active connection in BPF interval cleanActiveConnectionInterval = time.Second * 20 @@ -62,19 +57,6 @@ const ( connectionCheckExistTime = time.Second * 30 ) -type addressProcessType int - -const ( - addressProcessTypeUnknown addressProcessType = iota - addressProcessTypeLocal - addressProcessTypeKubernetes -) - -const ( - strLocal = "local" - strRemote = "remote" -) - type ConnectEventWithSocket struct { *events.SocketConnectEvent SocketPair *ip.SocketPair @@ -82,13 +64,11 @@ type ConnectEventWithSocket struct { type CloseEventWithNotify struct { *events.SocketCloseEvent - allProcessorFinished bool } type ConnectionProcessFinishCallback func() type ConnectionProcessor interface { - OnConnectionClose(event *events.SocketCloseEvent, callback ConnectionProcessFinishCallback) } type FlusherListener interface { @@ -105,10 +85,6 @@ type ConnectionManager struct { moduleMgr *module.Manager processOP process.Operator connections cmap.ConcurrentMap - // addressWithPid cache all local ip+port and pid mapping for match the process on the same host - // such as service mesh(process with envoy) - addressWithPid *cache.Expiring - localPortWithPid *cache.Expiring // in some case, we can only get the 127.0.0.1 from server side, so we only cache the port for this // localIPWithPid cache all local monitoring process bind IP address // for checking the remote address is local or not localIPWithPid map[string]int32 @@ -124,10 +100,9 @@ type ConnectionManager struct { processors []ConnectionProcessor processListeners []ProcessListener - // connection already close but the connection (protocols)log not build finished - allUnfinishedConnections map[string]*bool - flushListeners []FlusherListener + + connectTracker *ip.ConnTrack } func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) { @@ -142,10 +117,6 @@ func (c *ConnectionManager) RegisterNewFlushListener(listener FlusherListener) { c.flushListeners = append(c.flushListeners, listener) } -type addressInfo struct { - pid uint32 -} - type ConnectionInfo struct { ConnectionID uint64 RandomID uint64 @@ -158,19 +129,21 @@ type ConnectionInfo struct { } func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *bpf.Loader, filter MonitorFilter) *ConnectionManager { + track, err := ip.NewConnTrack() + if err != nil { + log.Warnf("cannot create the connection tracker, %v", err) + } mgr := &ConnectionManager{ - moduleMgr: moduleMgr, - processOP: moduleMgr.FindModule(process.ModuleName).(process.Operator), - connections: cmap.New(), - addressWithPid: cache.NewExpiring(), - localPortWithPid: cache.NewExpiring(), - localIPWithPid: make(map[string]int32), - monitoringProcesses: make(map[int32][]api.ProcessInterface), - processMonitorMap: bpfLoader.ProcessMonitorControl, - activeConnectionMap: bpfLoader.ActiveConnectionMap, - allUnfinishedConnections: make(map[string]*bool), - monitorFilter: filter, - flushListeners: make([]FlusherListener, 0), + moduleMgr: moduleMgr, + processOP: moduleMgr.FindModule(process.ModuleName).(process.Operator), + connections: cmap.New(), + localIPWithPid: make(map[string]int32), + monitoringProcesses: make(map[int32][]api.ProcessInterface), + processMonitorMap: bpfLoader.ProcessMonitorControl, + activeConnectionMap: bpfLoader.ActiveConnectionMap, + monitorFilter: filter, + flushListeners: make([]FlusherListener, 0), + connectTracker: track, } return mgr } @@ -276,39 +249,26 @@ func (c *ConnectionManager) Find(event events.Event) *ConnectionInfo { } func (c *ConnectionManager) buildRemoteAddress(e *events.SocketConnectEvent, socket *ip.SocketPair) *v3.ConnectionAddress { - tp := c.isLocalTarget(socket) - if tp == addressProcessTypeUnknown { - log.Debugf("building the remote address to unknown, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d", - e.GetConnectionID(), e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort) - return c.buildAddressFromRemote(socket.DestIP, socket.DestPort) + // if the remote address is local, then no needs to build the address(access log no need to send by communicate with self) + if tools.IsLocalHostAddress(socket.DestIP) { + return nil } - var addrInfo *addressInfo - var fromType string - switch socket.Role { - case enums.ConnectionRoleClient: - addrInfo = c.getAddressPid(socket.SrcIP, socket.SrcPort, false) - fromType = strLocal - case enums.ConnectionRoleServer: - addrInfo = c.getAddressPid(socket.DestIP, socket.DestPort, true) - fromType = strRemote - } - - if addrInfo != nil { - log.Debugf("building the remote address from %s process, pid: %d, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d", - fromType, addrInfo.pid, e.GetConnectionID(), e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort) - return c.buildLocalAddress(addrInfo.pid, socket.DestPort, socket) - } else if tp == addressProcessTypeKubernetes { - if p := c.localIPWithPid[socket.DestIP]; p != 0 { - log.Debugf("building the remote address from kubernetes process, connection: %d-%d, role: %s, pid: %d, local: %s:%d, remote: %s:%d", - e.GetConnectionID(), e.GetRandomID(), socket.Role, p, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort) - return c.buildLocalAddress(uint32(p), socket.DestPort, socket) + // if the remote connection is need to use conntrack, then update the real peer address + if socket.NeedConnTrack { + if err := c.connectTracker.UpdateRealPeerAddress(socket); err != nil { + log.Debugf("cannot update the real peer address, %v", err) } } - log.Debugf("cannot found the peer pid for the connection: %d-%d, remote type: %v, role: %s, local: %s:%d, remote: %s:%d", - e.GetConnectionID(), e.GetRandomID(), tp, socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort) - return nil + // found local address with pid + if pid, exist := c.localIPWithPid[socket.DestIP]; exist && pid != 0 { + return c.buildLocalAddress(uint32(pid), socket.DestPort, socket) + } + + log.Debugf("building the remote address to unknown, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d", + e.GetConnectionID(), e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort) + return c.buildAddressFromRemote(socket.DestIP, socket.DestPort) } func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, event events.Event) { @@ -317,12 +277,7 @@ func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, eve } switch e := event.(type) { case *CloseEventWithNotify: - if e.allProcessorFinished { - connection.MarkDeletable = true - } else { - // if not all processor finished, then add into the map - c.allUnfinishedConnections[fmt.Sprintf("%d_%d", event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished - } + connection.MarkDeletable = true case events.SocketDetail: tlsMode := connection.RPCConnection.TlsMode protocol := connection.RPCConnection.Protocol @@ -368,6 +323,17 @@ func (c *ConnectionManager) ProcessIsMonitor(pid uint32) bool { return len(c.monitoringProcesses[int32(pid)]) > 0 } +func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, detectType api.ProcessDetectType) bool { + c.monitoringProcessLock.RLock() + defer c.monitoringProcessLock.RUnlock() + for _, p := range c.monitoringProcesses[int32(pid)] { + if p.DetectType() == detectType { + return true + } + } + return false +} + func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, socket *ip.SocketPair, local, remote *v3.ConnectionAddress) *ConnectionInfo { var role v32.DetectPoint @@ -436,85 +402,9 @@ func (c *ConnectionManager) buildAddressFromRemote(ipHost string, port uint16) * } func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent) *CloseEventWithNotify { - result := &CloseEventWithNotify{ - SocketCloseEvent: event, - allProcessorFinished: false, - } - processCount := len(c.processors) - for _, l := range c.processors { - l.OnConnectionClose(event, func() { - processCount-- - if processCount > 0 { - return - } - result.allProcessorFinished = true - }) + return &CloseEventWithNotify{ + SocketCloseEvent: event, } - return result -} - -func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, localPid bool, pid uint32) { - localAddrInfo := &addressInfo{ - pid: pid, - } - c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port, localPid), localAddrInfo, localAddressPairCacheTime) - localStr := strRemote - if localPid { - localStr = strLocal - } - log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: %d", localStr, hostAddress, port, pid) -} - -func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16, localPid bool) *addressInfo { - addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", hostAddress, port, localPid)) - if ok && addrInfo != nil { - return addrInfo.(*addressInfo) - } - return nil -} - -func (c *ConnectionManager) OnConnectEvent(event *events.SocketConnectEvent, pair *ip.SocketPair) { - // only adding the local ip port when remote is local address - switch c.isLocalTarget(pair) { - case addressProcessTypeUnknown: - log.Debugf("the target address is not local, so no needs to save the cache. "+ - "address: %s:%d, pid: %d", pair.DestIP, pair.DestPort, event.PID) - case addressProcessTypeLocal: - switch pair.Role { - case enums.ConnectionRoleClient: - // if current is client, so the local port should be unique - c.savingTheAddress(pair.SrcIP, pair.SrcPort, true, event.PID) - case enums.ConnectionRoleServer: - // if current is server, so the remote port should be unique - c.savingTheAddress(pair.DestIP, pair.DestPort, false, event.PID) - case enums.ConnectionRoleUnknown: - log.Debugf("the target address local but unknown role, so no needs to save the cache. socket: [%s], pid: %d", - pair, event.PID) - } - case addressProcessTypeKubernetes: - switch pair.Role { - case enums.ConnectionRoleClient: - // if current is client, so the local port should be unique - c.savingTheAddress(pair.SrcIP, pair.SrcPort, true, event.PID) - case enums.ConnectionRoleServer: - // if current is server, so the remote port should be unique - c.savingTheAddress(pair.DestIP, pair.DestPort, false, event.PID) - case enums.ConnectionRoleUnknown: - log.Debugf("the target address kubernetes but unknown role, so no needs to save the cache. socket: [%s], pid: %d", - pair, event.PID) - } - } -} - -func (c *ConnectionManager) isLocalTarget(pair *ip.SocketPair) addressProcessType { - destIP := pair.DestIP - if tools.IsLocalHostAddress(destIP) { - return addressProcessTypeLocal - } - if _, exist := c.localIPWithPid[destIP]; exist { - return addressProcessTypeKubernetes - } - return addressProcessTypeUnknown } func (c *ConnectionManager) AddNewProcess(pid int32, entities []api.ProcessInterface) { @@ -706,20 +596,6 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() { } }) - deleteFromUnfinished := make([]string, 0) - for conKey, processorFinished := range c.allUnfinishedConnections { - if *processorFinished { - deletableConnections[conKey] = true - deleteFromUnfinished = append(deleteFromUnfinished, conKey) - } else { - // if the processor not finished, then ignore it from deletable connections - delete(deletableConnections, conKey) - } - } - for _, key := range deleteFromUnfinished { - delete(c.allUnfinishedConnections, key) - } - for key := range deletableConnections { log.Debugf("deleting the connection in manager: %s", key) c.connections.Remove(key) diff --git a/pkg/accesslog/common/filter.go b/pkg/accesslog/common/filter.go index 0d60012..34baee3 100644 --- a/pkg/accesslog/common/filter.go +++ b/pkg/accesslog/common/filter.go @@ -18,6 +18,7 @@ package common import ( + "os" "strings" "github.com/apache/skywalking-rover/pkg/process/api" @@ -46,7 +47,11 @@ func NewStaticMonitorFilter(namespaces, clusters []string) *StaticMonitorFilter } func (s *StaticMonitorFilter) ShouldIncludeProcesses(processes []api.ProcessInterface) (res []api.ProcessInterface) { + var selfPid = os.Getpid() for _, entity := range processes { + if int(entity.Pid()) == selfPid { + continue + } if entity.DetectType() != api.Kubernetes { // for now, we only have the kubernetes detected processes continue } diff --git a/pkg/accesslog/events/close.go b/pkg/accesslog/events/close.go index acd0d61..d4aaa1b 100644 --- a/pkg/accesslog/events/close.go +++ b/pkg/accesslog/events/close.go @@ -20,7 +20,7 @@ package events import ( "time" - "github.com/apache/skywalking-rover/pkg/tools/btf/reader" + "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/host" ) @@ -35,7 +35,7 @@ type SocketCloseEvent struct { Success uint32 } -func (c *SocketCloseEvent) ReadFrom(r *reader.Reader) { +func (c *SocketCloseEvent) ReadFrom(r btf.Reader) { c.ConnectionID = r.ReadUint64() c.RandomID = r.ReadUint64() c.StartTime = r.ReadUint64() diff --git a/pkg/accesslog/events/connect.go b/pkg/accesslog/events/connect.go index 9433168..daf6458 100644 --- a/pkg/accesslog/events/connect.go +++ b/pkg/accesslog/events/connect.go @@ -20,7 +20,7 @@ package events import ( "time" - "github.com/apache/skywalking-rover/pkg/tools/btf/reader" + "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/host" ) @@ -47,7 +47,7 @@ type SocketConnectEvent struct { ConnTrackUpstreamPort uint32 } -func (c *SocketConnectEvent) ReadFrom(r *reader.Reader) { +func (c *SocketConnectEvent) ReadFrom(r btf.Reader) { c.ConID = r.ReadUint64() c.RandomID = r.ReadUint64() c.StartTime = r.ReadUint64() diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go index 6ecfd2f..85bbd76 100644 --- a/pkg/accesslog/events/data.go +++ b/pkg/accesslog/events/data.go @@ -20,7 +20,7 @@ package events import ( "fmt" - "github.com/apache/skywalking-rover/pkg/tools/btf/reader" + "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/enums" ) @@ -41,7 +41,11 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } -func (s *SocketDataUploadEvent) ReadFrom(r *reader.Reader) { +func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte { + return &s.Buffer +} + +func (s *SocketDataUploadEvent) ReadFrom(r btf.Reader) { s.Protocol0 = enums.ConnectionProtocol(r.ReadUint8()) s.HaveReduce = r.ReadUint8() s.Direction0 = enums.SocketDataDirection(r.ReadUint8()) diff --git a/pkg/accesslog/events/detail.go b/pkg/accesslog/events/detail.go index a5ac46c..5beb5df 100644 --- a/pkg/accesslog/events/detail.go +++ b/pkg/accesslog/events/detail.go @@ -20,7 +20,7 @@ package events import ( "time" - "github.com/apache/skywalking-rover/pkg/tools/btf/reader" + "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/buffer" "github.com/apache/skywalking-rover/pkg/tools/enums" "github.com/apache/skywalking-rover/pkg/tools/host" @@ -73,7 +73,7 @@ type SocketDetailEvent struct { SSL uint8 } -func (d *SocketDetailEvent) ReadFrom(r *reader.Reader) { +func (d *SocketDetailEvent) ReadFrom(r btf.Reader) { d.ConnectionID = r.ReadUint64() d.RandomID = r.ReadUint64() d.DataID0 = r.ReadUint64() diff --git a/pkg/accesslog/events/events_test.go b/pkg/accesslog/events/events_test.go index 1da9f3e..f2c11bd 100644 --- a/pkg/accesslog/events/events_test.go +++ b/pkg/accesslog/events/events_test.go @@ -25,16 +25,16 @@ import ( "strings" "testing" - "github.com/stretchr/testify/assert" + "github.com/apache/skywalking-rover/pkg/tools/btf" - "github.com/apache/skywalking-rover/pkg/tools/btf/reader" + "github.com/stretchr/testify/assert" ) // nolint func TestBufferRead(t *testing.T) { tests := []struct { hex string - create func() reader.EventReader + create func() btf.EventReader }{ { hex: ` @@ -46,7 +46,7 @@ func TestBufferRead(t *testing.T) { 00 00 00 00 00 00 00 00 00 00 ff ff 7f 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00`, - create: func() reader.EventReader { + create: func() btf.EventReader { return &SocketConnectEvent{} }, }, @@ -59,7 +59,7 @@ func TestBufferRead(t *testing.T) { 23 4a 01 00 bb 49 01 00 00 00 00 00 e4 01 00 00 24 21 00 00 01 00 00 00 39 d6 00 00 00 00 00 00 03 00 00 00 00 00 00 00 02 02 00 02 02 09 01 00`, - create: func() reader.EventReader { + create: func() btf.EventReader { return &SocketDetailEvent{} }, }, @@ -69,7 +69,7 @@ func TestBufferRead(t *testing.T) { b2 2d 26 7c 5a 30 02 00 5e 34 26 7c 5a 30 02 00 7a a1 00 00 04 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00`, - create: func() reader.EventReader { + create: func() btf.EventReader { return &SocketCloseEvent{} }, }, @@ -209,7 +209,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 `, - create: func() reader.EventReader { + create: func() btf.EventReader { return &SocketDataUploadEvent{} }, }, @@ -226,7 +226,7 @@ a0 ea de 8e 56 e5 16 e5 d7 f0 e3 f9 09 35 c2 be } binaryRead := test.create() selfRead := test.create() - bufReader := reader.NewReader(rawData) + bufReader := btf.NewReader(rawData) selfRead.ReadFrom(bufReader) if err := bufReader.HasError(); err != nil { t.Fatalf("reading by self parsing error: %v", err) diff --git a/pkg/accesslog/events/ztunnel.go b/pkg/accesslog/events/ztunnel.go index abd2f1c..81572c3 100644 --- a/pkg/accesslog/events/ztunnel.go +++ b/pkg/accesslog/events/ztunnel.go @@ -17,7 +17,9 @@ package events -import "github.com/apache/skywalking-rover/pkg/tools/btf/reader" +import ( + "github.com/apache/skywalking-rover/pkg/tools/btf" +) type ZTunnelSocketMappingEvent struct { OriginalSrcIP uint32 @@ -30,7 +32,7 @@ type ZTunnelSocketMappingEvent struct { Pad1 uint32 } -func (z *ZTunnelSocketMappingEvent) ReadFrom(r *reader.Reader) { +func (z *ZTunnelSocketMappingEvent) ReadFrom(r btf.Reader) { z.OriginalSrcIP = r.ReadUint32() z.OriginalDestIP = r.ReadUint32() z.OriginalSrcPort = r.ReadUint16() diff --git a/pkg/process/api.go b/pkg/process/api.go index b84453b..ca11907 100644 --- a/pkg/process/api.go +++ b/pkg/process/api.go @@ -37,4 +37,6 @@ type Operator interface { type K8sOperator interface { // NodeName get the node name NodeName() string + // IsPodIP check the ip is pod ip + IsPodIP(ip string) (bool, error) } diff --git a/pkg/process/finders/kubernetes/finder.go b/pkg/process/finders/kubernetes/finder.go index 03a0ba7..58282ea 100644 --- a/pkg/process/finders/kubernetes/finder.go +++ b/pkg/process/finders/kubernetes/finder.go @@ -21,10 +21,12 @@ import ( "bufio" "context" "fmt" + "hash/fnv" "os" "regexp" "strconv" "strings" + "sync" "time" lru "github.com/hashicorp/golang-lru" @@ -36,6 +38,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/cache" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -53,6 +59,8 @@ var log = logger.GetLogger("process", "finder", "kubernetes") var ( kubepodsRegex = regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`) openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`) + ipExistTimeout = time.Minute * 10 + ipSearchParallel = 10 ) type ProcessFinder struct { @@ -73,6 +81,10 @@ type ProcessFinder struct { // runtime config namespaces []string + + // for IsPodIP check + podIPChecker *cache.Expiring + podIPMutexes map[int]*sync.Mutex } func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, manager base.ProcessManager) error { @@ -89,11 +101,16 @@ func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, ma f.stopChan = make(chan struct{}, 1) f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName) f.manager = manager - cache, err := lru.New(5000) + f.podIPChecker = cache.NewExpiring() + f.podIPMutexes = make(map[int]*sync.Mutex) + for i := 0; i < ipSearchParallel; i++ { + f.podIPMutexes[i] = &sync.Mutex{} + } + processCache, err := lru.New(5000) if err != nil { return err } - f.processCache = cache + f.processCache = processCache return nil } @@ -277,7 +294,7 @@ func (f *ProcessFinder) getProcessCGroup(pid int32) ([]string, error) { } defer cgroupFile.Close() - cache := make(map[string]bool) + cgroups := make(map[string]bool) scanner := bufio.NewScanner(cgroupFile) for scanner.Scan() { infos := strings.Split(scanner.Text(), ":") @@ -295,14 +312,14 @@ func (f *ProcessFinder) getProcessCGroup(pid int32) ([]string, error) { if openShiftPod := openShiftPodsRegex.FindStringSubmatch(path); len(openShiftPod) >= 1 { path = openShiftPod[1] } - cache[path] = true + cgroups[path] = true } } - if len(cache) == 0 { + if len(cgroups) == 0 { return nil, fmt.Errorf("no cgroups") } result := make([]string, 0) - for k := range cache { + for k := range cgroups { result = append(result, k) } return result, nil @@ -400,3 +417,32 @@ func (f *ProcessFinder) ShouldMonitor(pid int32) bool { f.manager.AddDetectedProcess(processes) return true } + +func (f *ProcessFinder) IsPodIP(ip string) (bool, error) { + val, exist := f.podIPChecker.Get(ip) + if exist { + return val.(bool), nil + } + + // parallels the search + h := fnv.New32a() + h.Write([]byte(ip)) + sum32 := int(h.Sum32()) + mutex := f.podIPMutexes[sum32%ipSearchParallel] + mutex.Lock() + defer mutex.Unlock() + + pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("status.podIP", ip).String(), + Limit: 1, + }) + if err != nil { + return false, err + } + found := len(pods.Items) > 0 + + // the timeout added a random value to avoid the cache avalanche + addedTime := time.Second * time.Duration(rand.IntnRange(10, 60)) + f.podIPChecker.Set(ip, found, ipExistTimeout+addedTime) + return found, nil +} diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go index dbe99c0..a252416 100644 --- a/pkg/process/finders/manager.go +++ b/pkg/process/finders/manager.go @@ -113,6 +113,15 @@ func (m *ProcessManager) Shutdown() error { return result } +func (m *ProcessManager) Finder(finderType api.ProcessDetectType) (base.ProcessFinder, bool) { + for _, finder := range m.finders { + if finder.DetectType() == finderType { + return finder, true + } + } + return nil, false +} + func (p *ProcessManagerWithFinder) GetModuleManager() *module.Manager { return p.moduleManager } diff --git a/pkg/process/module.go b/pkg/process/module.go index 648b8f6..95063b5 100644 --- a/pkg/process/module.go +++ b/pkg/process/module.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/process/finders" + "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes" ) const ModuleName = "process_discovery" @@ -105,3 +106,11 @@ func (m *Module) ShouldMonitor(pid int32) bool { func (m *Module) NodeName() string { return m.config.Kubernetes.NodeName } + +func (m *Module) IsPodIP(ip string) (bool, error) { + k8sFinder, exist := m.manager.Finder(api.Kubernetes) + if !exist { + return false, nil + } + return k8sFinder.(*kubernetes.ProcessFinder).IsPodIP(ip) +} diff --git a/pkg/profiling/task/network/analyze/events/data.go b/pkg/profiling/task/network/analyze/events/data.go index bc451fc..3c59378 100644 --- a/pkg/profiling/task/network/analyze/events/data.go +++ b/pkg/profiling/task/network/analyze/events/data.go @@ -40,6 +40,10 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } +func (s *SocketDataUploadEvent) ReleaseBuffer() *[2048]byte { + return &s.Buffer +} + func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol { return s.Protocol0 } diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go index 1c01084..359e466 100644 --- a/pkg/profiling/task/network/analyze/layer7/events.go +++ b/pkg/profiling/task/network/analyze/layer7/events.go @@ -20,6 +20,8 @@ package layer7 import ( "context" + "github.com/apache/skywalking-rover/pkg/tools/buffer" + profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base" analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols" @@ -37,7 +39,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize int, config *profili func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) { // socket buffer data l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, l.protocolPerCPUBuffer, 1, func() interface{} { - return &analyzeBase.SocketDataUploadEvent{} + return &analyzeBase.SocketDataUploadEvent{Buffer: *buffer.BorrowNewBuffer()} }, func(data interface{}) int { return int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID) }) diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go index bdbd7a6..3abb43b 100644 --- a/pkg/tools/btf/linker.go +++ b/pkg/tools/btf/linker.go @@ -30,7 +30,6 @@ import ( "golang.org/x/arch/arm64/arm64asm" "golang.org/x/arch/x86/x86asm" - "github.com/apache/skywalking-rover/pkg/tools/btf/reader" "github.com/apache/skywalking-rover/pkg/tools/elf" "github.com/apache/skywalking-rover/pkg/tools/process" @@ -167,7 +166,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, data func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer, parallels int, dataSupplier func() interface{}) { - rd, err := newQueueReader(emap, perCPUBuffer) + rd, err := perf.NewReader(emap, perCPUBuffer) if err != nil { m.errors = multierror.Append(m.errors, fmt.Errorf("open ring buffer error: %v", err)) return @@ -178,42 +177,53 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBuff } m.closers = append(m.closers, rd) + recordBuilder := newPerfRecordBuilder(dataSupplier()) for i := 0; i < parallels; i++ { - m.asyncReadEvent(rd, emap, dataSupplier, bufReader) + m.asyncReadEvent(rd, emap, recordBuilder, dataSupplier, bufReader) } } -func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) { +func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, recordPool *perfRecordBuilder, + dataSupplier func() interface{}, bufReader RingBufferReader) { go func() { for { - sample, err := rd.Read() + record := recordPool.GetRecord() + err := rd.ReadInto(record) if err != nil { + recordPool.PutRecord(record) if errors.Is(err, perf.ErrClosed) { return } log.Warnf("read from %s ringbuffer error: %v", emap.String(), err) continue } - if len(sample) == 0 { + + if record.LostSamples != 0 { + log.Warnf("perf event queue(%s) full, dropped %d samples", emap.String(), record.LostSamples) + recordPool.PutRecord(record) continue } data := dataSupplier() - if r, ok := data.(reader.EventReader); ok { - sampleReader := reader.NewReader(sample) + if r, ok := data.(EventReader); ok { + sampleReader := NewReader(record.RawSample) r.ReadFrom(sampleReader) if readErr := sampleReader.HasError(); readErr != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(sample), err) + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + recordPool.PutRecord(record) continue } } else { - if err := binary.Read(bytes.NewBuffer(sample), binary.LittleEndian, data); err != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(sample), err) + if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err != nil { + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + recordPool.PutRecord(record) continue } } bufReader(data) + + recordPool.PutRecord(record) } }() } @@ -402,3 +412,34 @@ func (m *Linker) Close() error { }) return err } + +type perfRecordBuilder struct { + dataSize int + pool sync.Pool +} + +func newPerfRecordBuilder(data interface{}) *perfRecordBuilder { + // added 8 bytes means fix some event not aligned + var size = binary.Size(data) + 8 + if r, ok := data.(EventReader); ok { + reader := newSizeCalcReader() + r.ReadFrom(reader) + size = reader.Size() + 8 + } + + builder := &perfRecordBuilder{ + dataSize: size, + } + builder.pool.New = func() any { + return &perf.Record{RawSample: make([]byte, 0, builder.dataSize)} + } + return builder +} + +func (p *perfRecordBuilder) GetRecord() *perf.Record { + return p.pool.Get().(*perf.Record) +} + +func (p *perfRecordBuilder) PutRecord(r *perf.Record) { + p.pool.Put(r) +} diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go index fdd8754..eff29c8 100644 --- a/pkg/tools/btf/queue.go +++ b/pkg/tools/btf/queue.go @@ -19,90 +19,16 @@ package btf import ( "context" - "fmt" "sync" "time" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/perf" - "github.com/cilium/ebpf/ringbuf" ) // queueChannelReducingCountCheckInterval is the interval to check the queue channel reducing count // if the reducing count is almost full, then added a warning log const queueChannelReducingCountCheckInterval = time.Second * 5 -type queueReader interface { - Read() ([]byte, error) - Close() error -} - -func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) { - switch emap.Type() { - case ebpf.RingBuf: - return newRingBufReader(emap) - case ebpf.PerfEventArray: - return newPerfQueueReader(emap, perCPUBuffer) - } - return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String()) -} - -type perfQueueReader struct { - name string - reader *perf.Reader -} - -func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, error) { - reader, err := perf.NewReader(emap, perCPUBuffer) - if err != nil { - return nil, err - } - return &perfQueueReader{reader: reader, name: emap.String()}, nil -} - -func (p *perfQueueReader) Read() ([]byte, error) { - read, err := p.reader.Read() - if err != nil { - return nil, err - } - - if read.LostSamples != 0 { - log.Warnf("perf event queue(%s) full, dropped %d samples", p.name, read.LostSamples) - return nil, nil - } - - return read.RawSample, nil -} - -func (p *perfQueueReader) Close() error { - return p.reader.Close() -} - -type ringBufReader struct { - reader *ringbuf.Reader - name string -} - -func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) { - reader, err := ringbuf.NewReader(emap) - if err != nil { - return nil, err - } - return &ringBufReader{reader: reader, name: emap.String()}, nil -} - -func (r *ringBufReader) Read() ([]byte, error) { - read, err := r.reader.Read() - if err != nil { - return nil, err - } - return read.RawSample, nil -} - -func (r *ringBufReader) Close() error { - return r.reader.Close() -} - type PartitionContext interface { Start(ctx context.Context) Consume(data interface{}) diff --git a/pkg/tools/btf/reader/reader.go b/pkg/tools/btf/reader.go similarity index 62% rename from pkg/tools/btf/reader/reader.go rename to pkg/tools/btf/reader.go index d2e980e..5cae4d5 100644 --- a/pkg/tools/btf/reader/reader.go +++ b/pkg/tools/btf/reader.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package reader +package btf import ( "encoding/binary" @@ -25,11 +25,20 @@ import ( // EventReader read the sample data by self, instant of binary.Read type EventReader interface { // ReadFrom read buffer data - ReadFrom(reader *Reader) + ReadFrom(reader Reader) } -// Reader buffer sample reader -type Reader struct { +type Reader interface { + HasError() error + ReadUint64() uint64 + ReadUint32() uint32 + ReadUint16() uint16 + ReadUint8() uint8 + ReadUint8Array(a []uint8, size int) +} + +// BytesReader buffer sample reader +type BytesReader struct { Sample []byte CurrentOffset int sampleLen int @@ -37,8 +46,8 @@ type Reader struct { } // NewReader create a reader from BPF buffer -func NewReader(sample []byte) *Reader { - return &Reader{ +func NewReader(sample []byte) Reader { + return &BytesReader{ Sample: sample, CurrentOffset: 0, sampleLen: len(sample), @@ -46,11 +55,11 @@ func NewReader(sample []byte) *Reader { } // HasError is there have error when reading buffer -func (r *Reader) HasError() error { +func (r *BytesReader) HasError() error { return r.err } -func (r *Reader) ReadUint64() uint64 { +func (r *BytesReader) ReadUint64() uint64 { bytes, err := r.read(8) if err != nil { return 0 @@ -58,7 +67,7 @@ func (r *Reader) ReadUint64() uint64 { return binary.LittleEndian.Uint64(bytes) } -func (r *Reader) ReadUint32() uint32 { +func (r *BytesReader) ReadUint32() uint32 { bytes, err := r.read(4) if err != nil { return 0 @@ -66,7 +75,7 @@ func (r *Reader) ReadUint32() uint32 { return binary.LittleEndian.Uint32(bytes) } -func (r *Reader) ReadUint16() uint16 { +func (r *BytesReader) ReadUint16() uint16 { bytes, err := r.read(2) if err != nil { return 0 @@ -74,7 +83,7 @@ func (r *Reader) ReadUint16() uint16 { return binary.LittleEndian.Uint16(bytes) } -func (r *Reader) ReadUint8() uint8 { +func (r *BytesReader) ReadUint8() uint8 { bytes, err := r.read(1) if err != nil { return 0 @@ -82,7 +91,7 @@ func (r *Reader) ReadUint8() uint8 { return bytes[0] } -func (r *Reader) ReadUint8Array(a []uint8, size int) { +func (r *BytesReader) ReadUint8Array(a []uint8, size int) { read, err := r.read(size) if err != nil { return @@ -90,7 +99,7 @@ func (r *Reader) ReadUint8Array(a []uint8, size int) { copy(a, read) } -func (r *Reader) read(size int) ([]byte, error) { +func (r *BytesReader) read(size int) ([]byte, error) { if r.err != nil { return nil, r.err } @@ -103,3 +112,43 @@ func (r *Reader) read(size int) ([]byte, error) { r.CurrentOffset += size return bytes, nil } + +type sizeCalcReader struct { + size int +} + +func newSizeCalcReader() *sizeCalcReader { + return &sizeCalcReader{} +} + +func (r *sizeCalcReader) HasError() error { + return nil +} + +func (r *sizeCalcReader) ReadUint64() uint64 { + r.size += 8 + return 0 +} + +func (r *sizeCalcReader) ReadUint32() uint32 { + r.size += 4 + return 0 +} + +func (r *sizeCalcReader) ReadUint16() uint16 { + r.size += 2 + return 0 +} + +func (r *sizeCalcReader) ReadUint8() uint8 { + r.size++ + return 0 +} + +func (r *sizeCalcReader) ReadUint8Array(a []uint8, size int) { + r.size += size +} + +func (r *sizeCalcReader) Size() int { + return r.size +} diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 368e1f6..33f6d1c 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -37,8 +37,18 @@ var ( emptyList = list.New() log = logger.GetLogger("tools", "buffer") + + PooledBuffer = sync.Pool{ + New: func() any { + return &[2048]byte{} + }, + } ) +func BorrowNewBuffer() *[2048]byte { + return PooledBuffer.Get().(*[2048]byte) +} + type SocketDataBuffer interface { // Protocol of the buffer Protocol() enums.ConnectionProtocol @@ -71,6 +81,8 @@ type SocketDataBuffer interface { StartTime() uint64 // EndTime the data end timestamp EndTime() uint64 + + ReleaseBuffer() *[2048]byte } type SocketDataDetail interface { @@ -259,60 +271,19 @@ func (r *Buffer) Clean() { // nolint func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { dataEvents := list.New() - detailEvents := list.New() - var firstDetailElement *list.Element - var lastBufferDataID = start.DataID() for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() { if nextElement == nil || nextElement.Value == nil { break } currentBuffer := nextElement.Value.(SocketDataBuffer) - // found first matches detail event - if detailEvents.Len() == 0 || firstDetailElement == nil { - for e := r.detailEvents.Front(); e != nil; e = e.Next() { - if e.Value == nil { - continue - } - if e.Value.(SocketDataDetail).DataID() >= currentBuffer.DataID() { - detailEvents.PushBack(e.Value) - firstDetailElement = e - break - } - } - } dataEvents.PushBack(currentBuffer) - lastBufferDataID = currentBuffer.DataID() } lastBuffer := end.element.Value.(SocketDataBuffer) dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer: lastBuffer, Size: end.bufIndex}) - // if the first detail element been found, append the details until the last buffer data id - var lastBufferID = lastBufferDataID - if lastBuffer != nil { - lastBufferID = lastBuffer.DataID() - } - if firstDetailElement == nil && r.detailEvents != nil { - for e := r.detailEvents.Front(); e != nil; e = e.Next() { - if e.Value != nil && e.Value.(SocketDataDetail).DataID() == lastBufferID { - detailEvents.PushBack(e.Value) - break - } - } - } else if firstDetailElement != nil && firstDetailElement.Value.(SocketDataDetail).DataID() != lastBufferID { - for tmp := firstDetailElement.Next(); tmp != nil; tmp = tmp.Next() { - if tmp.Value == nil { - continue - } - if tmp.Value.(SocketDataDetail).DataID() > lastBufferID { - break - } - detailEvents.PushBack(tmp.Value) - } - } - return &Buffer{ dataEvents: dataEvents, - detailEvents: detailEvents, + detailEvents: emptyList, validated: validated, head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, @@ -347,6 +318,9 @@ func (r *Buffer) BuildDetails() *list.List { } for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() { + if e.Value == nil { + continue + } if e.Value.(SocketDataDetail).DataID() >= fromDataID && e.Value.(SocketDataDetail).DataID() <= endDataID { events.PushBack(e.Value) } @@ -358,7 +332,7 @@ func (r *Buffer) BuildDetails() *list.List { dataIDList = append(dataIDList, e.Value.(SocketDataDetail).DataID()) } } - log.Debugf("cannot found details from original buffer, from data id: %d, end data id: %d, "+ + log.Infof("cannot found details from original buffer, from data id: %d, end data id: %d, "+ "ref: %p, existing details data id list: %v", fromDataID, endDataID, r.originalBuffer, dataIDList) } @@ -409,6 +383,20 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer { return r.dataEvents.Back().Value.(SocketDataBuffer) } +func (r *Buffer) TotalBuffer() []SocketDataBuffer { + if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 { + return nil + } + result := make([]SocketDataBuffer, 0, r.dataEvents.Len()) + for e := r.dataEvents.Front(); e != nil; e = e.Next() { + if e.Value == nil { + continue + } + result = append(result, e.Value.(SocketDataBuffer)) + } + return result +} + // DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count func (r *Buffer) DetectNotSendingLastPosition() *Position { if r == nil || r.dataEvents.Len() == 0 { @@ -642,7 +630,7 @@ func (r *Buffer) PrepareForReading() bool { if r.shouldResetPosition { r.ResetForLoopReading() r.shouldResetPosition = false - return false + return r.PrepareForReading() } if r.head == nil || r.head.element == nil { // read in the first element @@ -732,6 +720,11 @@ func (r *Buffer) removeElement0(element *list.Element) *list.Element { } result := element.Next() r.dataEvents.Remove(element) + if element.Value != nil { + if b, ok := element.Value.(SocketDataBuffer); ok && b != nil { + PooledBuffer.Put(b.ReleaseBuffer()) + } + } return result } @@ -819,10 +812,14 @@ func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int { expireTime := time.Now().Add(-expireDuration) // data event queue count := r.deleteEventsWithJudgement(r.dataEvents, func(element *list.Element) bool { + if element.Value == nil { + return true + } buffer := element.Value.(SocketDataBuffer) startTime := host.Time(buffer.StartTime()) if expireTime.After(startTime) { r.latestExpiredDataID = buffer.DataID() + PooledBuffer.Put(buffer.ReleaseBuffer()) return true } return false diff --git a/pkg/tools/ip.go b/pkg/tools/ip.go index 24305b9..b3a34fa 100644 --- a/pkg/tools/ip.go +++ b/pkg/tools/ip.go @@ -38,7 +38,7 @@ func DefaultHostIPAddress() string { // HostIPAddressV4 found the IPV4 address from appoint net interface name func HostIPAddressV4(name string) string { - address := host.ipAddresses[name] + address := host.ipAddressesByName[name] if address == nil { return "" } @@ -47,7 +47,7 @@ func HostIPAddressV4(name string) string { // HostIPAddressV6 found the IPV6 address from appoint net interface name func HostIPAddressV6(name string) string { - address := host.ipAddresses[name] + address := host.ipAddressesByName[name] if address == nil { return "" } @@ -56,10 +56,8 @@ func HostIPAddressV6(name string) string { // IsLocalHostAddress is the address from local func IsLocalHostAddress(address string) bool { - for _, h := range host.ipAddresses { - if h.ipV4 == address || h.ipV6 == address { - return true - } + if host.ipAddressesByIP[address] { + return true } return address == "0.0.0.0" } @@ -73,8 +71,9 @@ type hostInfo struct { // hostname name string // ip address - ipAddresses map[string]*hostIPAddress - defaultIPAddr string + ipAddressesByName map[string]*hostIPAddress + ipAddressesByIP map[string]bool + defaultIPAddr string } type hostIPAddress struct { @@ -83,7 +82,7 @@ type hostIPAddress struct { } func queryHostInfo() *hostInfo { - addresses, def, err := localIPAddress0() + addressesByName, def, err := localIPAddress0() if err != nil { panic(err) } @@ -91,7 +90,12 @@ func queryHostInfo() *hostInfo { if err != nil { panic(err) } - return &hostInfo{name: name, ipAddresses: addresses, defaultIPAddr: def} + addressesByIP := make(map[string]bool) + for _, addr := range addressesByName { + addressesByIP[addr.ipV4] = true + addressesByIP[addr.ipV6] = true + } + return &hostInfo{name: name, ipAddressesByName: addressesByName, ipAddressesByIP: addressesByIP, defaultIPAddr: def} } func hostname0() (string, error) { diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go index 72dc1b9..3e3f934 100644 --- a/pkg/tools/ip/conntrack.go +++ b/pkg/tools/ip/conntrack.go @@ -18,14 +18,15 @@ package ip import ( + "fmt" "net" "syscall" "github.com/florianl/go-conntrack" - "github.com/apache/skywalking-rover/pkg/logger" - "golang.org/x/sys/unix" + + "github.com/apache/skywalking-rover/pkg/logger" ) var log = logger.GetLogger("tools", "ip") @@ -47,10 +48,11 @@ func NewConnTrack() (*ConnTrack, error) { if err != nil { return nil, err } + return &ConnTrack{tracker: nfct}, nil } -func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) bool { +func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error { family := conntrack.IPv4 if addr.Family == unix.AF_INET6 { family = conntrack.IPv6 @@ -64,17 +66,18 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) bool { session, e := c.tracker.Get(conntrack.Conntrack, family, conntrack.Con{Origin: tuple}) if e != nil { // try to get the reply session, if the info not exists or from accept events, have error is normal - log.Debugf("cannot get the conntrack session, type: %s, family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v", info.name, + return fmt.Errorf("cannot get the conntrack session, type: %s, family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v", info.name, family, tuple.Src, *tuple.Proto.SrcPort, tuple.Dst, *tuple.Proto.DstPort, e) - continue } if res := c.filterValidateReply(session, tuple); res != nil { addr.DestIP = res.Src.String() - return true + addr.NeedConnTrack = false + log.Debugf("update real peer address from conntrack: %s:%d", addr.DestIP, addr.DestPort) + return nil } } - return false + return nil } func (c *ConnTrack) parseSocketToTuple(addr *SocketPair) *conntrack.IPTuple { diff --git a/pkg/tools/ip/tcpresolver.go b/pkg/tools/ip/tcpresolver.go index a07c216..9f09309 100644 --- a/pkg/tools/ip/tcpresolver.go +++ b/pkg/tools/ip/tcpresolver.go @@ -34,6 +34,8 @@ type SocketPair struct { SrcPort uint16 DestIP string DestPort uint16 + + NeedConnTrack bool } func (s *SocketPair) IsValid() bool {
