This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/reduce-handle-connect-time by 
this push:
     new 2e9d6f9  disable conntrack monitoring
2e9d6f9 is described below

commit 2e9d6f9ae1a7d2481789f86281f32cbf2fcd0c8e
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 23:27:47 2024 +0800

    disable conntrack monitoring
---
 pkg/accesslog/collector/connection.go |   1 -
 pkg/accesslog/common/connection.go    | 120 +++++-----------------------------
 pkg/tools/ip/conntrack.go             |  12 +---
 3 files changed, 16 insertions(+), 117 deletions(-)

diff --git a/pkg/accesslog/collector/connection.go 
b/pkg/accesslog/collector/connection.go
index d23c111..5cc2176 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -190,7 +190,6 @@ func (c *ConnectionPartitionContext) 
OnConnectionSocketFinished(event *events.So
        }
        connectionLogger.Debugf("build socket pair success, connection ID: %d, 
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
                event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, 
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
-       c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
        forwarder.SendConnectEvent(c.context, event, socketPair)
 }
 
diff --git a/pkg/accesslog/common/connection.go 
b/pkg/accesslog/common/connection.go
index 63ad32a..7fc2550 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -42,8 +42,6 @@ import (
 
        cmap "github.com/orcaman/concurrent-map"
 
-       "k8s.io/apimachinery/pkg/util/cache"
-
        v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
        v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
 )
@@ -103,10 +101,6 @@ type ConnectionManager struct {
        moduleMgr   *module.Manager
        processOP   process.Operator
        connections cmap.ConcurrentMap
-       // addressWithPid cache all local ip+port and pid mapping for match the 
process on the same host
-       // such as service mesh(process with envoy)
-       addressWithPid   *cache.Expiring
-       localPortWithPid *cache.Expiring // in some case, we can only get the 
127.0.0.1 from server side, so we only cache the port for this
        // localIPWithPid cache all local monitoring process bind IP address
        // for checking the remote address is local or not
        localIPWithPid map[string]int32
@@ -166,8 +160,6 @@ func NewConnectionManager(config *Config, moduleMgr 
*module.Manager, bpfLoader *
                moduleMgr:           moduleMgr,
                processOP:           
moduleMgr.FindModule(process.ModuleName).(process.Operator),
                connections:         cmap.New(),
-               addressWithPid:      cache.NewExpiring(),
-               localPortWithPid:    cache.NewExpiring(),
                localIPWithPid:      make(map[string]int32),
                monitoringProcesses: make(map[int32][]api.ProcessInterface),
                processMonitorMap:   bpfLoader.ProcessMonitorControl,
@@ -280,39 +272,26 @@ func (c *ConnectionManager) Find(event events.Event) 
*ConnectionInfo {
 }
 
 func (c *ConnectionManager) buildRemoteAddress(e *events.SocketConnectEvent, 
socket *ip.SocketPair) *v3.ConnectionAddress {
-       tp := c.isLocalTarget(socket)
-       if tp == addressProcessTypeUnknown {
-               log.Debugf("building the remote address to unknown, connection: 
%d-%d, role: %s, local: %s:%d, remote: %s:%d",
-                       e.GetConnectionID(), e.GetRandomID(), socket.Role, 
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
-               return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
+       // if the remote address is local, then no needs to build the 
address(access log no need to send by communicate with self)
+       if tools.IsLocalHostAddress(socket.DestIP) {
+               return nil
        }
 
-       var addrInfo *addressInfo
-       var fromType string
-       switch socket.Role {
-       case enums.ConnectionRoleClient:
-               addrInfo = c.getAddressPid(socket.SrcIP, socket.SrcPort, false)
-               fromType = strLocal
-       case enums.ConnectionRoleServer:
-               addrInfo = c.getAddressPid(socket.DestIP, socket.DestPort, true)
-               fromType = strRemote
-       }
-
-       if addrInfo != nil {
-               log.Debugf("building the remote address from %s process, pid: 
%d, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d",
-                       fromType, addrInfo.pid, e.GetConnectionID(), 
e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, 
socket.DestPort)
-               return c.buildLocalAddress(addrInfo.pid, socket.DestPort, 
socket)
-       } else if tp == addressProcessTypeKubernetes {
-               if p := c.localIPWithPid[socket.DestIP]; p != 0 {
-                       log.Debugf("building the remote address from kubernetes 
process, connection: %d-%d, role: %s, pid: %d, local: %s:%d, remote: %s:%d",
-                               e.GetConnectionID(), e.GetRandomID(), 
socket.Role, p, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
-                       return c.buildLocalAddress(uint32(p), socket.DestPort, 
socket)
+       // if the remote connection is need to use conntrack, then update the 
real peer address
+       if socket.NeedConnTrack {
+               if err := c.ConnectTracker.UpdateRealPeerAddress(socket); err 
!= nil {
+                       log.Debugf("cannot update the real peer address, %v", 
err)
                }
        }
 
-       log.Debugf("cannot found the peer pid for the connection: %d-%d, remote 
type: %v, role: %s, local: %s:%d, remote: %s:%d",
-               e.GetConnectionID(), e.GetRandomID(), tp, socket.Role, 
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
-       return nil
+       // found local address with pid
+       if pid, exist := c.localIPWithPid[socket.DestIP]; exist && pid != 0 {
+               return c.buildLocalAddress(uint32(pid), socket.DestPort, socket)
+       }
+
+       log.Debugf("building the remote address to unknown, connection: %d-%d, 
role: %s, local: %s:%d, remote: %s:%d",
+               e.GetConnectionID(), e.GetRandomID(), socket.Role, 
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
+       return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
 }
 
 func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, 
event events.Event) {
@@ -451,75 +430,6 @@ func (c *ConnectionManager) OnConnectionClose(event 
*events.SocketCloseEvent) *C
        }
 }
 
-func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, 
localPid bool, pid uint32) {
-       localAddrInfo := &addressInfo{
-               pid: pid,
-       }
-       c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port, 
localPid), localAddrInfo, localAddressPairCacheTime)
-       localStr := strRemote
-       if localPid {
-               localStr = strLocal
-       }
-       log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: 
%d", localStr, hostAddress, port, pid)
-}
-
-func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16, 
localPid bool) *addressInfo {
-       addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", 
hostAddress, port, localPid))
-       if ok && addrInfo != nil {
-               return addrInfo.(*addressInfo)
-       }
-       return nil
-}
-
-func (c *ConnectionManager) OnConnectEvent(event *events.SocketConnectEvent, 
pair *ip.SocketPair) {
-       // only adding the local ip port when remote is local address
-       switch c.isLocalTarget(pair) {
-       case addressProcessTypeUnknown:
-               log.Debugf("the target address is not local, so no needs to 
save the cache. "+
-                       "address: %s:%d, pid: %d", pair.DestIP, pair.DestPort, 
event.PID)
-       case addressProcessTypeLocal:
-               switch pair.Role {
-               case enums.ConnectionRoleClient:
-                       // if current is client, so the local port should be 
unique
-                       c.savingTheAddress(pair.SrcIP, pair.SrcPort, true, 
event.PID)
-               case enums.ConnectionRoleServer:
-                       // if current is server, so the remote port should be 
unique
-                       c.savingTheAddress(pair.DestIP, pair.DestPort, false, 
event.PID)
-               case enums.ConnectionRoleUnknown:
-                       log.Debugf("the target address local but unknown role, 
so no needs to save the cache. socket: [%s], pid: %d",
-                               pair, event.PID)
-               }
-       case addressProcessTypeKubernetes:
-               switch pair.Role {
-               case enums.ConnectionRoleClient:
-                       // if current is client, so the local port should be 
unique
-                       c.savingTheAddress(pair.SrcIP, pair.SrcPort, true, 
event.PID)
-               case enums.ConnectionRoleServer:
-                       // if current is server, so the remote port should be 
unique
-                       c.savingTheAddress(pair.DestIP, pair.DestPort, false, 
event.PID)
-               case enums.ConnectionRoleUnknown:
-                       log.Debugf("the target address kubernetes but unknown 
role, so no needs to save the cache. socket: [%s], pid: %d",
-                               pair, event.PID)
-               }
-       }
-}
-
-func (c *ConnectionManager) isLocalTarget(pair *ip.SocketPair) 
addressProcessType {
-       if tools.IsLocalHostAddress(pair.DestIP) {
-               return addressProcessTypeLocal
-       }
-       // if the remote connection is need to use conntrack, then update the 
real peer address
-       if pair.NeedConnTrack {
-               if err := c.ConnectTracker.UpdateRealPeerAddress(pair, false); 
err != nil {
-                       log.Debugf("cannot update the real peer address, %v", 
err)
-               }
-       }
-       if _, exist := c.localIPWithPid[pair.DestIP]; exist {
-               return addressProcessTypeKubernetes
-       }
-       return addressProcessTypeUnknown
-}
-
 func (c *ConnectionManager) AddNewProcess(pid int32, entities 
[]api.ProcessInterface) {
        // filtering the namespace
        monitorProcesses := c.shouldMonitorProcesses(entities)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 16f4cdc..410b11c 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -79,23 +79,13 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) 
error {
                if res := c.filterValidateReply(session, tuple); res != nil {
                        addr.DestIP = res.Src.String()
                        addr.NeedConnTrack = false
-                       log.Debugf("update real peer address from conntrack: 
%s:%d", addr.DestIP, addr.DestPort)
+                       log.Infof("update real peer address from conntrack: 
%s:%d", addr.DestIP, addr.DestPort)
                        return nil
                }
        }
        return nil
 }
 
-type conntrackExpireKey struct {
-       sourceIP, destIP     string
-       sourcePort, destPort uint16
-}
-
-type conntrackExpireValue struct {
-       realIP   string
-       realPort uint16
-}
-
 func (c *ConnTrack) parseSocketToTuple(addr *SocketPair) *conntrack.IPTuple {
        tcp := uint8(syscall.IPPROTO_TCP)
        srcIP := net.ParseIP(addr.SrcIP)

Reply via email to