This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 2e9d6f9 disable conntrack monitoring
2e9d6f9 is described below
commit 2e9d6f9ae1a7d2481789f86281f32cbf2fcd0c8e
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 23:27:47 2024 +0800
disable conntrack monitoring
---
pkg/accesslog/collector/connection.go | 1 -
pkg/accesslog/common/connection.go | 120 +++++-----------------------------
pkg/tools/ip/conntrack.go | 12 +---
3 files changed, 16 insertions(+), 117 deletions(-)
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index d23c111..5cc2176 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -190,7 +190,6 @@ func (c *ConnectionPartitionContext)
OnConnectionSocketFinished(event *events.So
}
connectionLogger.Debugf("build socket pair success, connection ID: %d,
randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP,
socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
- c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
forwarder.SendConnectEvent(c.context, event, socketPair)
}
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index 63ad32a..7fc2550 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -42,8 +42,6 @@ import (
cmap "github.com/orcaman/concurrent-map"
- "k8s.io/apimachinery/pkg/util/cache"
-
v32 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
@@ -103,10 +101,6 @@ type ConnectionManager struct {
moduleMgr *module.Manager
processOP process.Operator
connections cmap.ConcurrentMap
- // addressWithPid cache all local ip+port and pid mapping for match the
process on the same host
- // such as service mesh(process with envoy)
- addressWithPid *cache.Expiring
- localPortWithPid *cache.Expiring // in some case, we can only get the
127.0.0.1 from server side, so we only cache the port for this
// localIPWithPid cache all local monitoring process bind IP address
// for checking the remote address is local or not
localIPWithPid map[string]int32
@@ -166,8 +160,6 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
moduleMgr: moduleMgr,
processOP:
moduleMgr.FindModule(process.ModuleName).(process.Operator),
connections: cmap.New(),
- addressWithPid: cache.NewExpiring(),
- localPortWithPid: cache.NewExpiring(),
localIPWithPid: make(map[string]int32),
monitoringProcesses: make(map[int32][]api.ProcessInterface),
processMonitorMap: bpfLoader.ProcessMonitorControl,
@@ -280,39 +272,26 @@ func (c *ConnectionManager) Find(event events.Event)
*ConnectionInfo {
}
func (c *ConnectionManager) buildRemoteAddress(e *events.SocketConnectEvent,
socket *ip.SocketPair) *v3.ConnectionAddress {
- tp := c.isLocalTarget(socket)
- if tp == addressProcessTypeUnknown {
- log.Debugf("building the remote address to unknown, connection:
%d-%d, role: %s, local: %s:%d, remote: %s:%d",
- e.GetConnectionID(), e.GetRandomID(), socket.Role,
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
- return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
+ // if the remote address is local, then no needs to build the
address(access log no need to send by communicate with self)
+ if tools.IsLocalHostAddress(socket.DestIP) {
+ return nil
}
- var addrInfo *addressInfo
- var fromType string
- switch socket.Role {
- case enums.ConnectionRoleClient:
- addrInfo = c.getAddressPid(socket.SrcIP, socket.SrcPort, false)
- fromType = strLocal
- case enums.ConnectionRoleServer:
- addrInfo = c.getAddressPid(socket.DestIP, socket.DestPort, true)
- fromType = strRemote
- }
-
- if addrInfo != nil {
- log.Debugf("building the remote address from %s process, pid:
%d, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d",
- fromType, addrInfo.pid, e.GetConnectionID(),
e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP,
socket.DestPort)
- return c.buildLocalAddress(addrInfo.pid, socket.DestPort,
socket)
- } else if tp == addressProcessTypeKubernetes {
- if p := c.localIPWithPid[socket.DestIP]; p != 0 {
- log.Debugf("building the remote address from kubernetes
process, connection: %d-%d, role: %s, pid: %d, local: %s:%d, remote: %s:%d",
- e.GetConnectionID(), e.GetRandomID(),
socket.Role, p, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
- return c.buildLocalAddress(uint32(p), socket.DestPort,
socket)
+ // if the remote connection is need to use conntrack, then update the
real peer address
+ if socket.NeedConnTrack {
+ if err := c.ConnectTracker.UpdateRealPeerAddress(socket); err
!= nil {
+ log.Debugf("cannot update the real peer address, %v",
err)
}
}
- log.Debugf("cannot found the peer pid for the connection: %d-%d, remote
type: %v, role: %s, local: %s:%d, remote: %s:%d",
- e.GetConnectionID(), e.GetRandomID(), tp, socket.Role,
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
- return nil
+ // found local address with pid
+ if pid, exist := c.localIPWithPid[socket.DestIP]; exist && pid != 0 {
+ return c.buildLocalAddress(uint32(pid), socket.DestPort, socket)
+ }
+
+ log.Debugf("building the remote address to unknown, connection: %d-%d,
role: %s, local: %s:%d, remote: %s:%d",
+ e.GetConnectionID(), e.GetRandomID(), socket.Role,
socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort)
+ return c.buildAddressFromRemote(socket.DestIP, socket.DestPort)
}
func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo,
event events.Event) {
@@ -451,75 +430,6 @@ func (c *ConnectionManager) OnConnectionClose(event
*events.SocketCloseEvent) *C
}
}
-func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16,
localPid bool, pid uint32) {
- localAddrInfo := &addressInfo{
- pid: pid,
- }
- c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port,
localPid), localAddrInfo, localAddressPairCacheTime)
- localStr := strRemote
- if localPid {
- localStr = strLocal
- }
- log.Debugf("saving the %s address with pid cache, address: %s:%d, pid:
%d", localStr, hostAddress, port, pid)
-}
-
-func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16,
localPid bool) *addressInfo {
- addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t",
hostAddress, port, localPid))
- if ok && addrInfo != nil {
- return addrInfo.(*addressInfo)
- }
- return nil
-}
-
-func (c *ConnectionManager) OnConnectEvent(event *events.SocketConnectEvent,
pair *ip.SocketPair) {
- // only adding the local ip port when remote is local address
- switch c.isLocalTarget(pair) {
- case addressProcessTypeUnknown:
- log.Debugf("the target address is not local, so no needs to
save the cache. "+
- "address: %s:%d, pid: %d", pair.DestIP, pair.DestPort,
event.PID)
- case addressProcessTypeLocal:
- switch pair.Role {
- case enums.ConnectionRoleClient:
- // if current is client, so the local port should be
unique
- c.savingTheAddress(pair.SrcIP, pair.SrcPort, true,
event.PID)
- case enums.ConnectionRoleServer:
- // if current is server, so the remote port should be
unique
- c.savingTheAddress(pair.DestIP, pair.DestPort, false,
event.PID)
- case enums.ConnectionRoleUnknown:
- log.Debugf("the target address local but unknown role,
so no needs to save the cache. socket: [%s], pid: %d",
- pair, event.PID)
- }
- case addressProcessTypeKubernetes:
- switch pair.Role {
- case enums.ConnectionRoleClient:
- // if current is client, so the local port should be
unique
- c.savingTheAddress(pair.SrcIP, pair.SrcPort, true,
event.PID)
- case enums.ConnectionRoleServer:
- // if current is server, so the remote port should be
unique
- c.savingTheAddress(pair.DestIP, pair.DestPort, false,
event.PID)
- case enums.ConnectionRoleUnknown:
- log.Debugf("the target address kubernetes but unknown
role, so no needs to save the cache. socket: [%s], pid: %d",
- pair, event.PID)
- }
- }
-}
-
-func (c *ConnectionManager) isLocalTarget(pair *ip.SocketPair)
addressProcessType {
- if tools.IsLocalHostAddress(pair.DestIP) {
- return addressProcessTypeLocal
- }
- // if the remote connection is need to use conntrack, then update the
real peer address
- if pair.NeedConnTrack {
- if err := c.ConnectTracker.UpdateRealPeerAddress(pair, false);
err != nil {
- log.Debugf("cannot update the real peer address, %v",
err)
- }
- }
- if _, exist := c.localIPWithPid[pair.DestIP]; exist {
- return addressProcessTypeKubernetes
- }
- return addressProcessTypeUnknown
-}
-
func (c *ConnectionManager) AddNewProcess(pid int32, entities
[]api.ProcessInterface) {
// filtering the namespace
monitorProcesses := c.shouldMonitorProcesses(entities)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index 16f4cdc..410b11c 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -79,23 +79,13 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair)
error {
if res := c.filterValidateReply(session, tuple); res != nil {
addr.DestIP = res.Src.String()
addr.NeedConnTrack = false
- log.Debugf("update real peer address from conntrack:
%s:%d", addr.DestIP, addr.DestPort)
+ log.Infof("update real peer address from conntrack:
%s:%d", addr.DestIP, addr.DestPort)
return nil
}
}
return nil
}
-type conntrackExpireKey struct {
- sourceIP, destIP string
- sourcePort, destPort uint16
-}
-
-type conntrackExpireValue struct {
- realIP string
- realPort uint16
-}
-
func (c *ConnTrack) parseSocketToTuple(addr *SocketPair) *conntrack.IPTuple {
tcp := uint8(syscall.IPPROTO_TCP)
srcIP := net.ParseIP(addr.SrcIP)