This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch reduce-handle-connect-time in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
commit f971cdc0f6d74b4a3ca6600ec5f7bba2b5bceac8 Author: mrproliu <[email protected]> AuthorDate: Fri Dec 27 13:07:08 2024 +0800 Reduce handle connect event time --- pkg/accesslog/collector/connection.go | 31 +++++++++++++++++---- pkg/accesslog/common/connection.go | 11 ++++++++ pkg/process/api.go | 2 ++ pkg/process/finders/kubernetes/finder.go | 34 +++++++++++++++++++++-- pkg/process/finders/manager.go | 9 +++++++ pkg/process/module.go | 9 +++++++ pkg/tools/buffer/buffer.go | 46 +++----------------------------- 7 files changed, 93 insertions(+), 49 deletions(-) diff --git a/pkg/accesslog/collector/connection.go b/pkg/accesslog/collector/connection.go index 4fc6823..9abadc3 100644 --- a/pkg/accesslog/collector/connection.go +++ b/pkg/accesslog/collector/connection.go @@ -33,6 +33,8 @@ import ( "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" + "github.com/apache/skywalking-rover/pkg/process" + "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/tools" "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/enums" @@ -56,7 +58,7 @@ func NewConnectionCollector() *ConnectCollector { return &ConnectCollector{} } -func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext) error { +func (c *ConnectCollector) Start(m *module.Manager, ctx *common.AccessLogContext) error { perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize) if err != nil { return err @@ -79,7 +81,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext } c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { - return newConnectionPartitionContext(ctx, track) + return newConnectionPartitionContext(ctx, track, m.FindModule(process.ModuleName).(process.K8sOperator)) }) c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { @@ -131,12 +133,15 @@ func (c *ConnectCollector) Stop() { type ConnectionPartitionContext struct { context *common.AccessLogContext connTracker *ip.ConnTrack + k8sOperator process.K8sOperator } -func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker *ip.ConnTrack) *ConnectionPartitionContext { +func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker *ip.ConnTrack, + k8sOperator process.K8sOperator) *ConnectionPartitionContext { return &ConnectionPartitionContext{ context: ctx, connTracker: connTracker, + k8sOperator: k8sOperator, } } @@ -294,8 +299,24 @@ func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnect } func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) { - if socket != nil && socket.IsValid() && c.connTracker != nil && !tools.IsLocalHostAddress(socket.DestIP) && - event.FuncName != enums.SocketFunctionNameAccept { // accept event don't need to update the remote address + if socket == nil || !socket.IsValid() || tools.IsLocalHostAddress(socket.DestIP) && + event.FuncName == enums.SocketFunctionNameAccept { // accept event don't need to update the remote address + return + } + if c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes) { + isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP) + log.Infof("ip: %s, isPodIP: %v, err: %v", socket.DestIP, isPodIP, err) + if err != nil { + connectionLogger.Warnf("cannot found the pod IP, connection ID: %d, randomID: %d, error: %v", + event.ConID, event.RandomID, err) + } + if isPodIP { + connectionLogger.Debugf("detect the remote IP is pod IP, connection ID: %d, randomID: %d, remote: %s", + event.ConID, event.RandomID, socket.DestIP) + return + } + } + if c.connTracker != nil { // if no contract and socket data is valid, then trying to get the remote address from the socket // to encase the remote address is not the real remote address originalIP := socket.DestIP diff --git a/pkg/accesslog/common/connection.go b/pkg/accesslog/common/connection.go index afac079..c6630cc 100644 --- a/pkg/accesslog/common/connection.go +++ b/pkg/accesslog/common/connection.go @@ -368,6 +368,17 @@ func (c *ConnectionManager) ProcessIsMonitor(pid uint32) bool { return len(c.monitoringProcesses[int32(pid)]) > 0 } +func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, detectType api.ProcessDetectType) bool { + c.monitoringProcessLock.RLock() + defer c.monitoringProcessLock.RUnlock() + for _, p := range c.monitoringProcesses[int32(pid)] { + if p.DetectType() == detectType { + return true + } + } + return false +} + func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, socket *ip.SocketPair, local, remote *v3.ConnectionAddress) *ConnectionInfo { var role v32.DetectPoint diff --git a/pkg/process/api.go b/pkg/process/api.go index b84453b..ca11907 100644 --- a/pkg/process/api.go +++ b/pkg/process/api.go @@ -37,4 +37,6 @@ type Operator interface { type K8sOperator interface { // NodeName get the node name NodeName() string + // IsPodIP check the ip is pod ip + IsPodIP(ip string) (bool, error) } diff --git a/pkg/process/finders/kubernetes/finder.go b/pkg/process/finders/kubernetes/finder.go index 03a0ba7..03f8481 100644 --- a/pkg/process/finders/kubernetes/finder.go +++ b/pkg/process/finders/kubernetes/finder.go @@ -36,6 +36,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/cache" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -53,6 +56,7 @@ var log = logger.GetLogger("process", "finder", "kubernetes") var ( kubepodsRegex = regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`) openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`) + ipExistTimeout = time.Minute ) type ProcessFinder struct { @@ -73,6 +77,9 @@ type ProcessFinder struct { // runtime config namespaces []string + + // for IsPodIP check + podIPChecker *cache.Expiring } func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, manager base.ProcessManager) error { @@ -89,11 +96,12 @@ func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, ma f.stopChan = make(chan struct{}, 1) f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName) f.manager = manager - cache, err := lru.New(5000) + f.podIPChecker = cache.NewExpiring() + processCache, err := lru.New(5000) if err != nil { return err } - f.processCache = cache + f.processCache = processCache return nil } @@ -400,3 +408,25 @@ func (f *ProcessFinder) ShouldMonitor(pid int32) bool { f.manager.AddDetectedProcess(processes) return true } + +func (f *ProcessFinder) IsPodIP(ip string) (bool, error) { + val, exist := f.podIPChecker.Get(ip) + if exist { + return val.(bool), nil + } + pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("status.podIP", ip).String(), + }) + if err != nil { + return false, err + } + found := false + for _, pod := range pods.Items { + if pod.Status.PodIP == ip { + found = true + } + } + + f.podIPChecker.Set(ip, found, ipExistTimeout) + return found, nil +} diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go index dbe99c0..a252416 100644 --- a/pkg/process/finders/manager.go +++ b/pkg/process/finders/manager.go @@ -113,6 +113,15 @@ func (m *ProcessManager) Shutdown() error { return result } +func (m *ProcessManager) Finder(finderType api.ProcessDetectType) (base.ProcessFinder, bool) { + for _, finder := range m.finders { + if finder.DetectType() == finderType { + return finder, true + } + } + return nil, false +} + func (p *ProcessManagerWithFinder) GetModuleManager() *module.Manager { return p.moduleManager } diff --git a/pkg/process/module.go b/pkg/process/module.go index 648b8f6..95063b5 100644 --- a/pkg/process/module.go +++ b/pkg/process/module.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/process/finders" + "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes" ) const ModuleName = "process_discovery" @@ -105,3 +106,11 @@ func (m *Module) ShouldMonitor(pid int32) bool { func (m *Module) NodeName() string { return m.config.Kubernetes.NodeName } + +func (m *Module) IsPodIP(ip string) (bool, error) { + k8sFinder, exist := m.manager.Finder(api.Kubernetes) + if !exist { + return false, nil + } + return k8sFinder.(*kubernetes.ProcessFinder).IsPodIP(ip) +} diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 368e1f6..9efa237 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -259,60 +259,19 @@ func (r *Buffer) Clean() { // nolint func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer { dataEvents := list.New() - detailEvents := list.New() - var firstDetailElement *list.Element - var lastBufferDataID = start.DataID() for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() { if nextElement == nil || nextElement.Value == nil { break } currentBuffer := nextElement.Value.(SocketDataBuffer) - // found first matches detail event - if detailEvents.Len() == 0 || firstDetailElement == nil { - for e := r.detailEvents.Front(); e != nil; e = e.Next() { - if e.Value == nil { - continue - } - if e.Value.(SocketDataDetail).DataID() >= currentBuffer.DataID() { - detailEvents.PushBack(e.Value) - firstDetailElement = e - break - } - } - } dataEvents.PushBack(currentBuffer) - lastBufferDataID = currentBuffer.DataID() } lastBuffer := end.element.Value.(SocketDataBuffer) dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer: lastBuffer, Size: end.bufIndex}) - // if the first detail element been found, append the details until the last buffer data id - var lastBufferID = lastBufferDataID - if lastBuffer != nil { - lastBufferID = lastBuffer.DataID() - } - if firstDetailElement == nil && r.detailEvents != nil { - for e := r.detailEvents.Front(); e != nil; e = e.Next() { - if e.Value != nil && e.Value.(SocketDataDetail).DataID() == lastBufferID { - detailEvents.PushBack(e.Value) - break - } - } - } else if firstDetailElement != nil && firstDetailElement.Value.(SocketDataDetail).DataID() != lastBufferID { - for tmp := firstDetailElement.Next(); tmp != nil; tmp = tmp.Next() { - if tmp.Value == nil { - continue - } - if tmp.Value.(SocketDataDetail).DataID() > lastBufferID { - break - } - detailEvents.PushBack(tmp.Value) - } - } - return &Buffer{ dataEvents: dataEvents, - detailEvents: detailEvents, + detailEvents: emptyList, validated: validated, head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex}, @@ -347,6 +306,9 @@ func (r *Buffer) BuildDetails() *list.List { } for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() { + if e.Value == nil { + continue + } if e.Value.(SocketDataDetail).DataID() >= fromDataID && e.Value.(SocketDataDetail).DataID() <= endDataID { events.PushBack(e.Value) }
