This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 45bb01a  disable conntrack monitoring
45bb01a is described below

commit 45bb01a13edb46bbedd0601541e53b8514afc6d7
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 23:14:12 2024 +0800

    disable conntrack monitoring
---
 pkg/accesslog/collector/connection.go | 136 ++++++++--------------------------
 pkg/accesslog/common/connection.go    |   7 --
 pkg/tools/ip/conntrack.go             | 114 +---------------------------
 3 files changed, 36 insertions(+), 221 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index 7954e2d..d23c111 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -18,13 +18,11 @@
 package collector
 
 import (
-       "container/list"
        "context"
        "encoding/binary"
        "fmt"
        "net"
        "os"
-       "sync"
        "time"
 
        "github.com/docker/go-units"
@@ -132,67 +130,19 @@ func (c *ConnectCollector) Stop() {
 }
 
 type ConnectionPartitionContext struct {
-       context         *common.AccessLogContext
-       k8sOperator     process.K8sOperator
-       retryableEvents *list.List
-       retryableMutex  sync.Mutex
+       context     *common.AccessLogContext
+       k8sOperator process.K8sOperator
 }
 
 func NewConnectionPartitionContext(ctx *common.AccessLogContext,
        k8sOperator process.K8sOperator) *ConnectionPartitionContext {
        return &ConnectionPartitionContext{
-               context:         ctx,
-               k8sOperator:     k8sOperator,
-               retryableEvents: list.New(),
+               context:     ctx,
+               k8sOperator: k8sOperator,
        }
 }
 
 func (c *ConnectionPartitionContext) Start(ctx context.Context) {
-       go func() {
-               ticker := time.NewTicker(connectionAnalyzeRetryTime)
-               for {
-                       select {
-                       case <-ctx.Done():
-                               return
-                       case <-ticker.C:
-                               c.analyzeFailureEvent()
-                       }
-               }
-       }()
-}
-
-func (c *ConnectionPartitionContext) analyzeFailureEvent() {
-       for {
-               event := c.PopEventToRetry()
-               if event == nil {
-                       break
-               }
-               socketPair, err := c.BuildSocketFromConnectEvent(event)
-               if err != nil {
-                       connectionLogger.Debugf("retry to analyze the connect 
event failure, connection ID: %d, randomID: %d, error: %v",
-                               event.ConID, event.RandomID, err)
-                       continue
-               }
-               c.OnConnectionSocketFinished(event, socketPair)
-       }
-}
-
-func (c *ConnectionPartitionContext) AddEventToRetry(e 
*events.SocketConnectEvent) {
-       c.retryableMutex.Lock()
-       defer c.retryableMutex.Unlock()
-       c.retryableEvents.PushBack(e)
-}
-
-func (c *ConnectionPartitionContext) PopEventToRetry() 
*events.SocketConnectEvent {
-       c.retryableMutex.Lock()
-       defer c.retryableMutex.Unlock()
-       if c.retryableEvents.Len() == 0 {
-               return nil
-       }
-       element := c.retryableEvents.Front()
-       event := element.Value.(*events.SocketConnectEvent)
-       c.retryableEvents.Remove(element)
-       return event
 }
 
 func (c *ConnectionPartitionContext) Consume(data interface{}) {
@@ -202,11 +152,7 @@ func (c *ConnectionPartitionContext) Consume(data 
interface{}) {
                        "pid: %d, fd: %d, role: %s: func: %s, family: %d, 
success: %d, conntrack exist: %t",
                        event.ConID, event.RandomID, event.PID, event.SocketFD, 
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
                        event.SocketFamily, event.ConnectSuccess, 
event.ConnTrackUpstreamPort != 0)
-               socketPair, err := c.BuildSocketFromConnectEvent(event)
-               if err != nil {
-                       c.AddEventToRetry(event)
-                       return
-               }
+               socketPair := c.BuildSocketFromConnectEvent(event)
                c.OnConnectionSocketFinished(event, socketPair)
        case *events.SocketCloseEvent:
                connectionLogger.Debugf("receive close event, connection ID: 
%d, randomID: %d, pid: %d, fd: %d",
@@ -248,40 +194,36 @@ func (c *ConnectionPartitionContext) 
OnConnectionSocketFinished(event *events.So
        forwarder.SendConnectEvent(c.context, event, socketPair)
 }
 
-func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event 
*events.SocketConnectEvent) (*ip.SocketPair, error) {
+func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event 
*events.SocketConnectEvent) *ip.SocketPair {
        if event.SocketFamily != unix.AF_INET && event.SocketFamily != 
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
                // if not ipv4, ipv6 or unknown, ignore
-               return nil, nil
-       }
-       pair, err := c.BuildSocketPair(event)
-       if err != nil {
-               return nil, err
+               return nil
        }
+       pair := c.BuildSocketPair(event)
        if pair != nil && pair.IsValid() {
                connectionLogger.Debugf("found the connection from the connect 
event is valid, connection ID: %d, randomID: %d",
                        event.ConID, event.RandomID)
-               return pair, nil
+               return pair
        }
        // if only the local port not success, maybe the upstream port is not 
open, so it could be continued
        if c.IsOnlyLocalPortEmpty(pair) {
                event.ConnectSuccess = 0
                connectionLogger.Debugf("the connection from the connect event 
is only the local port is empty, connection ID: %d, randomID: %d",
                        event.ConID, event.RandomID)
-               return pair, nil
+               return pair
        }
 
-       pair, err = ip.ParseSocket(event.PID, event.SocketFD)
+       pair, err := ip.ParseSocket(event.PID, event.SocketFD)
        if err != nil {
                log.Debugf("cannot found the socket, pid: %d, socket FD: %d, 
error: %v", event.PID, event.SocketFD, err)
-               // because if the socket is not found, the connection is not 
valid, should not return the error for retry
-               return nil, nil
+               return nil
        }
        connectionLogger.Debugf("found the connection from the socket, 
connection ID: %d, randomID: %d",
                event.ConID, event.RandomID)
        pair.Role = enums.ConnectionRole(event.Role)
        c.FixSocketFamilyIfNeed(event, pair)
-       c.TryToUpdateSocketFromConntrack(event, pair)
-       return pair, nil
+       c.CheckNeedConntrack(event, pair)
+       return pair
 }
 
 func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair 
*ip.SocketPair) bool {
@@ -296,7 +238,7 @@ func (c *ConnectionPartitionContext) 
IsOnlyLocalPortEmpty(socketPair *ip.SocketP
        return socketPair.IsValid()
 }
 
-func (c *ConnectionPartitionContext) BuildSocketPair(event 
*events.SocketConnectEvent) (*ip.SocketPair, error) {
+func (c *ConnectionPartitionContext) BuildSocketPair(event 
*events.SocketConnectEvent) *ip.SocketPair {
        var result *ip.SocketPair
        haveConnTrack := false
        if event.SocketFamily == unix.AF_INET {
@@ -348,46 +290,32 @@ func (c *ConnectionPartitionContext) 
BuildSocketPair(event *events.SocketConnect
        }
 
        if haveConnTrack {
-               return result, nil
+               return result
        }
 
        c.FixSocketFamilyIfNeed(event, result)
-       // support retry to update the socket from conntrack
-       err := c.TryToUpdateSocketFromConntrack(event, result)
-       return result, err
+       c.CheckNeedConntrack(event, result)
+       return result
 }
 
-func (c *ConnectionPartitionContext) TryToUpdateSocketFromConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) error {
+func (c *ConnectionPartitionContext) CheckNeedConntrack(event 
*events.SocketConnectEvent, socket *ip.SocketPair) error {
        if socket == nil || !socket.IsValid() || 
tools.IsLocalHostAddress(socket.DestIP) ||
-               event.FuncName == enums.SocketFunctionNameAccept { // accept 
event don't need to update the remote address
+               event.FuncName == enums.SocketFunctionNameAccept || // accept 
event don't need to update the remote address
+               !c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, 
api.Kubernetes) { // only the k8s process need to update the remote address 
from conntrack
                return nil
        }
-       if c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes) 
{
-               isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
-               if err != nil {
-                       connectionLogger.Warnf("cannot found the pod IP, 
connection ID: %d, randomID: %d, error: %v",
-                               event.ConID, event.RandomID, err)
-               }
-               if isPodIP {
-                       connectionLogger.Debugf("detect the remote IP is pod 
IP, connection ID: %d, randomID: %d, remote: %s",
-                               event.ConID, event.RandomID, socket.DestIP)
-                       return nil
-               }
+
+       isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
+       if err != nil {
+               connectionLogger.Warnf("cannot found the pod IP, connection ID: 
%d, randomID: %d, error: %v",
+                       event.ConID, event.RandomID, err)
        }
-       connectionLogger.Debugf("try to update the remote address from 
conntrack, connection ID: %d, randomID: %d, func: %s, local: %s:%d, remote: 
%s:%d",
-               event.ConID, event.RandomID, 
enums.SocketFunctionName(event.FuncName), socket.SrcIP, socket.SrcPort, 
socket.DestIP, socket.DestPort)
-       if c.context.ConnectionMgr.ConnectTracker != nil {
-               // if no contract and socket data is valid, then trying to get 
the remote address from the socket
-               // to encase the remote address is not the real remote address
-               originalIP := socket.DestIP
-               originalPort := socket.DestPort
-               err := 
c.context.ConnectionMgr.ConnectTracker.UpdateRealPeerAddress(socket, true)
-               if err != nil {
-                       return fmt.Errorf("update the socket address from 
conntrack failure, error: %v", err)
-               }
-               connectionLogger.Debugf("update the socket address from 
conntrack success, "+
-                       "connection ID: %d, randomID: %d, original remote: 
%s:%d, new remote: %s:%d",
-                       event.ConID, event.RandomID, originalIP, originalPort, 
socket.DestIP, socket.DestPort)
+       if isPodIP {
+               connectionLogger.Debugf("detect the remote IP is pod IP, 
connection ID: %d, randomID: %d, remote: %s",
+                       event.ConID, event.RandomID, socket.DestIP)
+               return nil
        }
+       // update to the socket need to update the remote address from conntrack
+       socket.NeedConnTrack = true
        return nil
 }
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index ca185ff..63ad32a 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -182,13 +182,6 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
 func (c *ConnectionManager) Start(ctx context.Context, accessLogContext 
*AccessLogContext) {
        c.processOP.AddListener(c)
 
-       if c.ConnectTracker != nil {
-               err := c.ConnectTracker.StartMonitoring(ctx)
-               if err != nil {
-                       log.Warnf("cannot start the connection tracker 
monitoring, %v", err)
-               }
-       }
-
        // starting to clean up the un-active connection in BPF
        go func() {
                ticker := time.NewTicker(cleanActiveConnectionInterval)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index fe18d79..16f4cdc 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -18,17 +18,13 @@
 package ip
 
 import (
-       "context"
        "fmt"
        "github.com/florianl/go-conntrack"
-       "github.com/mdlayher/netlink"
        "golang.org/x/sys/unix"
        "net"
        "syscall"
        "time"
 
-       "k8s.io/apimachinery/pkg/util/cache"
-
        "github.com/apache/skywalking-rover/pkg/logger"
 )
 
@@ -48,12 +44,7 @@ var numberStrategies = []struct {
 }}
 
 type ConnTrack struct {
-       queryClient   *conntrack.Nfct
-       monitorClient *conntrack.Nfct
-       monitorExpire *cache.Expiring
-
-       ctx    context.Context
-       cancel context.CancelFunc
+       nfct *conntrack.Nfct
 }
 
 func NewConnTrack() (*ConnTrack, error) {
@@ -63,108 +54,11 @@ func NewConnTrack() (*ConnTrack, error) {
        }
 
        return &ConnTrack{
-               queryClient:   nfct,
-               monitorExpire: cache.NewExpiring(),
+               nfct: nfct,
        }, nil
 }
 
-func (c *ConnTrack) StartMonitoring(ctx context.Context) error {
-       c.ctx, c.cancel = context.WithCancel(ctx)
-       errors := make(chan error, 2)
-       errChain, err := c.monitor0(c.ctx)
-       if err != nil {
-               return err
-       }
-
-       go func() {
-               e := <-errChain
-               errors <- e
-
-               for {
-                       select {
-                       case <-ctx.Done():
-                               return
-                       case monitoringError := <-errors:
-                               log.Warnf("monitoring conntrack failure, will 
re-try monitoring after 5 second. error: %v", monitoringError)
-                               time.Sleep(time.Second * 5)
-
-                               errChain, monitoringError = c.monitor0(ctx)
-                               if monitoringError != nil {
-                                       errors <- monitoringError
-                                       continue
-                               }
-                               mError := <-errChain
-                               errors <- mError
-                       }
-               }
-       }()
-       return nil
-}
-
-func (c *ConnTrack) monitor0(ctx context.Context) (<-chan error, error) {
-       if c.monitorClient != nil {
-               if e := c.monitorClient.Close(); e != nil {
-                       log.Warnf("close the conntack monitor client error: 
%v", e)
-               }
-       }
-
-       nfct, err := conntrack.Open(&conntrack.Config{})
-       if err != nil {
-               return nil, err
-       }
-       c.monitorClient = nfct
-       c.monitorClient.Con.SetReadBuffer(26214400)
-       c.monitorClient.Con.SetOption(netlink.ListenAllNSID, true)
-       errChan := c.monitorClient.AttachErrChan()
-       err = c.monitorClient.RegisterFiltered(ctx, conntrack.Conntrack, 
conntrack.NetlinkCtNew,
-               []conntrack.ConnAttr{
-                       {Type: conntrack.AttrOrigPortDst, Data: []byte{0x0, 
0x35}, Mask: []byte{0xff, 0xff}, Negate: true}, // DstPort != 53
-               }, func(con conntrack.Con) int {
-                       if con.Origin == nil || con.Reply == nil || 
con.Origin.Proto == nil || con.Reply.Proto == nil ||
-                               con.Origin.Src == nil || con.Origin.Dst == nil 
|| con.Origin.Proto.SrcPort == nil || con.Origin.Proto.DstPort == nil ||
-                               con.Reply.Src == nil || con.Reply.Dst == nil || 
con.Reply.Proto.SrcPort == nil || con.Reply.Proto.DstPort == nil {
-                               return 0
-                       }
-
-                       c.monitorExpire.Set(conntrackExpireKey{
-                               sourceIP:   con.Origin.Src.String(),
-                               destIP:     con.Origin.Dst.String(),
-                               sourcePort: *con.Origin.Proto.SrcPort,
-                               destPort:   *con.Origin.Proto.DstPort,
-                       }, conntrackExpireValue{
-                               realIP:   con.Reply.Src.String(),
-                               realPort: *con.Reply.Proto.SrcPort,
-                       }, monitorExpireTime)
-                       return 0
-               })
-       if err != nil {
-               return nil, err
-       }
-       return errChan, nil
-}
-
-func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair, fromCacheOnly 
bool) error {
-       key := conntrackExpireKey{
-               sourceIP:   addr.SrcIP,
-               destIP:     addr.DestIP,
-               sourcePort: addr.SrcPort,
-               destPort:   addr.DestPort,
-       }
-       val, exist := c.monitorExpire.Get(key)
-       if exist {
-               v := val.(conntrackExpireValue)
-               addr.DestIP = v.realIP
-               addr.DestPort = v.realPort
-               addr.NeedConnTrack = false
-               log.Debugf("update real peer address from cache: %s:%d", 
addr.DestIP, addr.DestPort)
-               c.monitorExpire.Delete(key)
-               return nil
-       }
-       addr.NeedConnTrack = true
-       if fromCacheOnly {
-               return nil
-       }
-
+func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error {
        family := conntrack.IPv4
        if addr.Family == unix.AF_INET6 {
                family = conntrack.IPv6
@@ -175,7 +69,7 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair, 
fromCacheOnly bool)
                tuple.Proto.Number = &(info.proto)
 
                // using get to query protocol
-               session, e := c.queryClient.Get(conntrack.Conntrack, family, 
conntrack.Con{Origin: tuple})
+               session, e := c.nfct.Get(conntrack.Conntrack, family, 
conntrack.Con{Origin: tuple})
                if e != nil {
                        // try to get the reply session, if the info not exists 
or from accept events, have error is normal
                        return fmt.Errorf("cannot get the conntrack session, 
type: %s, family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v", 
info.name,

Reply via email to