This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git

commit f971cdc0f6d74b4a3ca6600ec5f7bba2b5bceac8
Author: mrproliu <[email protected]>
AuthorDate: Fri Dec 27 13:07:08 2024 +0800

    Reduce handle connect event time
---
 pkg/accesslog/collector/connection.go    | 31 +++++++++++++++++----
 pkg/accesslog/common/connection.go       | 11 ++++++++
 pkg/process/api.go                       |  2 ++
 pkg/process/finders/kubernetes/finder.go | 34 +++++++++++++++++++++--
 pkg/process/finders/manager.go           |  9 +++++++
 pkg/process/module.go                    |  9 +++++++
 pkg/tools/buffer/buffer.go               | 46 +++-----------------------------
 7 files changed, 93 insertions(+), 49 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index 4fc6823..9abadc3 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -33,6 +33,8 @@ import (
        "github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
        "github.com/apache/skywalking-rover/pkg/logger"
        "github.com/apache/skywalking-rover/pkg/module"
+       "github.com/apache/skywalking-rover/pkg/process"
+       "github.com/apache/skywalking-rover/pkg/process/api"
        "github.com/apache/skywalking-rover/pkg/tools"
        "github.com/apache/skywalking-rover/pkg/tools/btf"
        "github.com/apache/skywalking-rover/pkg/tools/enums"
@@ -56,7 +58,7 @@ func NewConnectionCollector() *ConnectCollector {
        return &ConnectCollector{}
 }
 
-func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext) error {
+func (c *ConnectCollector) Start(m *module.Manager, ctx 
*common.AccessLogContext) error {
        perCPUBufferSize, err := 
units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
        if err != nil {
                return err
@@ -79,7 +81,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx 
*common.AccessLogContext
        }
        c.eventQueue = btf.NewEventQueue("connection resolver", 
ctx.Config.ConnectionAnalyze.AnalyzeParallels,
                ctx.Config.ConnectionAnalyze.QueueSize, func(num int) 
btf.PartitionContext {
-                       return newConnectionPartitionContext(ctx, track)
+                       return newConnectionPartitionContext(ctx, track, 
m.FindModule(process.ModuleName).(process.K8sOperator))
                })
        c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, 
int(perCPUBufferSize),
                ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} 
{
@@ -131,12 +133,15 @@ func (c *ConnectCollector) Stop() {
 type ConnectionPartitionContext struct {
        context     *common.AccessLogContext
        connTracker *ip.ConnTrack
+       k8sOperator process.K8sOperator
 }
 
-func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker 
*ip.ConnTrack) *ConnectionPartitionContext {
+func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker 
*ip.ConnTrack,
+       k8sOperator process.K8sOperator) *ConnectionPartitionContext {
        return &ConnectionPartitionContext{
                context:     ctx,
                connTracker: connTracker,
+               k8sOperator: k8sOperator,
        }
 }
 
@@ -294,8 +299,24 @@ func (c *ConnectionPartitionContext) buildSocketPair(event 
*events.SocketConnect
 }
 
 func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) {
-       if socket != nil && socket.IsValid() && c.connTracker != nil && 
!tools.IsLocalHostAddress(socket.DestIP) &&
-               event.FuncName != enums.SocketFunctionNameAccept { // accept 
event don't need to update the remote address
+       if socket == nil || !socket.IsValid() || 
tools.IsLocalHostAddress(socket.DestIP) &&
+               event.FuncName == enums.SocketFunctionNameAccept { // accept 
event don't need to update the remote address
+               return
+       }
+       if c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes) 
{
+               isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
+               log.Infof("ip: %s, isPodIP: %v, err: %v", socket.DestIP, 
isPodIP, err)
+               if err != nil {
+                       connectionLogger.Warnf("cannot found the pod IP, 
connection ID: %d, randomID: %d, error: %v",
+                               event.ConID, event.RandomID, err)
+               }
+               if isPodIP {
+                       connectionLogger.Debugf("detect the remote IP is pod 
IP, connection ID: %d, randomID: %d, remote: %s",
+                               event.ConID, event.RandomID, socket.DestIP)
+                       return
+               }
+       }
+       if c.connTracker != nil {
                // if no contract and socket data is valid, then trying to get 
the remote address from the socket
                // to encase the remote address is not the real remote address
                originalIP := socket.DestIP
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index afac079..c6630cc 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -368,6 +368,17 @@ func (c *ConnectionManager) ProcessIsMonitor(pid uint32) 
bool {
        return len(c.monitoringProcesses[int32(pid)]) > 0
 }
 
+func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, detectType 
api.ProcessDetectType) bool {
+       c.monitoringProcessLock.RLock()
+       defer c.monitoringProcessLock.RUnlock()
+       for _, p := range c.monitoringProcesses[int32(pid)] {
+               if p.DetectType() == detectType {
+                       return true
+               }
+       }
+       return false
+}
+
 func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, 
socket *ip.SocketPair,
        local, remote *v3.ConnectionAddress) *ConnectionInfo {
        var role v32.DetectPoint
diff --git a/pkg/process/api.go b/pkg/process/api.go
index b84453b..ca11907 100644
--- a/pkg/process/api.go
+++ b/pkg/process/api.go
@@ -37,4 +37,6 @@ type Operator interface {
 type K8sOperator interface {
        // NodeName get the node name
        NodeName() string
+       // IsPodIP check the ip is pod ip
+       IsPodIP(ip string) (bool, error)
 }
diff --git a/pkg/process/finders/kubernetes/finder.go 
b/pkg/process/finders/kubernetes/finder.go
index 03a0ba7..03f8481 100644
--- a/pkg/process/finders/kubernetes/finder.go
+++ b/pkg/process/finders/kubernetes/finder.go
@@ -36,6 +36,9 @@ import (
        v1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
+       "k8s.io/apimachinery/pkg/fields"
+       "k8s.io/apimachinery/pkg/util/cache"
+
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
 
@@ -53,6 +56,7 @@ var log = logger.GetLogger("process", "finder", "kubernetes")
 var (
        kubepodsRegex      = 
regexp.MustCompile(`cri-containerd-(?P<Group>\w+)\.scope`)
        openShiftPodsRegex = regexp.MustCompile(`crio-(?P<Group>\w+)\.scope`)
+       ipExistTimeout     = time.Minute
 )
 
 type ProcessFinder struct {
@@ -73,6 +77,9 @@ type ProcessFinder struct {
 
        // runtime config
        namespaces []string
+
+       // for IsPodIP check
+       podIPChecker *cache.Expiring
 }
 
 func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, 
manager base.ProcessManager) error {
@@ -89,11 +96,12 @@ func (f *ProcessFinder) Init(ctx context.Context, conf 
base.FinderBaseConfig, ma
        f.stopChan = make(chan struct{}, 1)
        f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
        f.manager = manager
-       cache, err := lru.New(5000)
+       f.podIPChecker = cache.NewExpiring()
+       processCache, err := lru.New(5000)
        if err != nil {
                return err
        }
-       f.processCache = cache
+       f.processCache = processCache
 
        return nil
 }
@@ -400,3 +408,25 @@ func (f *ProcessFinder) ShouldMonitor(pid int32) bool {
        f.manager.AddDetectedProcess(processes)
        return true
 }
+
+func (f *ProcessFinder) IsPodIP(ip string) (bool, error) {
+       val, exist := f.podIPChecker.Get(ip)
+       if exist {
+               return val.(bool), nil
+       }
+       pods, err := f.CLI.CoreV1().Pods(v1.NamespaceAll).List(f.ctx, 
metav1.ListOptions{
+               FieldSelector: fields.OneTermEqualSelector("status.podIP", 
ip).String(),
+       })
+       if err != nil {
+               return false, err
+       }
+       found := false
+       for _, pod := range pods.Items {
+               if pod.Status.PodIP == ip {
+                       found = true
+               }
+       }
+
+       f.podIPChecker.Set(ip, found, ipExistTimeout)
+       return found, nil
+}
diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go
index dbe99c0..a252416 100644
--- a/pkg/process/finders/manager.go
+++ b/pkg/process/finders/manager.go
@@ -113,6 +113,15 @@ func (m *ProcessManager) Shutdown() error {
        return result
 }
 
+func (m *ProcessManager) Finder(finderType api.ProcessDetectType) 
(base.ProcessFinder, bool) {
+       for _, finder := range m.finders {
+               if finder.DetectType() == finderType {
+                       return finder, true
+               }
+       }
+       return nil, false
+}
+
 func (p *ProcessManagerWithFinder) GetModuleManager() *module.Manager {
        return p.moduleManager
 }
diff --git a/pkg/process/module.go b/pkg/process/module.go
index 648b8f6..95063b5 100644
--- a/pkg/process/module.go
+++ b/pkg/process/module.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/skywalking-rover/pkg/module"
        "github.com/apache/skywalking-rover/pkg/process/api"
        "github.com/apache/skywalking-rover/pkg/process/finders"
+       "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
 )
 
 const ModuleName = "process_discovery"
@@ -105,3 +106,11 @@ func (m *Module) ShouldMonitor(pid int32) bool {
 func (m *Module) NodeName() string {
        return m.config.Kubernetes.NodeName
 }
+
+func (m *Module) IsPodIP(ip string) (bool, error) {
+       k8sFinder, exist := m.manager.Finder(api.Kubernetes)
+       if !exist {
+               return false, nil
+       }
+       return k8sFinder.(*kubernetes.ProcessFinder).IsPodIP(ip)
+}
diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go
index 368e1f6..9efa237 100644
--- a/pkg/tools/buffer/buffer.go
+++ b/pkg/tools/buffer/buffer.go
@@ -259,60 +259,19 @@ func (r *Buffer) Clean() {
 // nolint
 func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
        dataEvents := list.New()
-       detailEvents := list.New()
-       var firstDetailElement *list.Element
-       var lastBufferDataID = start.DataID()
        for nextElement := start.element; nextElement != end.element; 
nextElement = nextElement.Next() {
                if nextElement == nil || nextElement.Value == nil {
                        break
                }
                currentBuffer := nextElement.Value.(SocketDataBuffer)
-               // found first matches detail event
-               if detailEvents.Len() == 0 || firstDetailElement == nil {
-                       for e := r.detailEvents.Front(); e != nil; e = e.Next() 
{
-                               if e.Value == nil {
-                                       continue
-                               }
-                               if e.Value.(SocketDataDetail).DataID() >= 
currentBuffer.DataID() {
-                                       detailEvents.PushBack(e.Value)
-                                       firstDetailElement = e
-                                       break
-                               }
-                       }
-               }
                dataEvents.PushBack(currentBuffer)
-               lastBufferDataID = currentBuffer.DataID()
        }
        lastBuffer := end.element.Value.(SocketDataBuffer)
        dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer: 
lastBuffer, Size: end.bufIndex})
 
-       // if the first detail element been found, append the details until the 
last buffer data id
-       var lastBufferID = lastBufferDataID
-       if lastBuffer != nil {
-               lastBufferID = lastBuffer.DataID()
-       }
-       if firstDetailElement == nil && r.detailEvents != nil {
-               for e := r.detailEvents.Front(); e != nil; e = e.Next() {
-                       if e.Value != nil && 
e.Value.(SocketDataDetail).DataID() == lastBufferID {
-                               detailEvents.PushBack(e.Value)
-                               break
-                       }
-               }
-       } else if firstDetailElement != nil && 
firstDetailElement.Value.(SocketDataDetail).DataID() != lastBufferID {
-               for tmp := firstDetailElement.Next(); tmp != nil; tmp = 
tmp.Next() {
-                       if tmp.Value == nil {
-                               continue
-                       }
-                       if tmp.Value.(SocketDataDetail).DataID() > lastBufferID 
{
-                               break
-                       }
-                       detailEvents.PushBack(tmp.Value)
-               }
-       }
-
        return &Buffer{
                dataEvents:     dataEvents,
-               detailEvents:   detailEvents,
+               detailEvents:   emptyList,
                validated:      validated,
                head:           &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
                current:        &Position{element: dataEvents.Front(), 
bufIndex: start.bufIndex},
@@ -347,6 +306,9 @@ func (r *Buffer) BuildDetails() *list.List {
                }
 
                for e := r.originalBuffer.detailEvents.Front(); e != nil; e = 
e.Next() {
+                       if e.Value == nil {
+                               continue
+                       }
                        if e.Value.(SocketDataDetail).DataID() >= fromDataID && 
e.Value.(SocketDataDetail).DataID() <= endDataID {
                                events.PushBack(e.Value)
                        }

Reply via email to