This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch reduce-handle-connect-time
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/reduce-handle-connect-time by
this push:
new 45bb01a disable conntrack monitoring
45bb01a is described below
commit 45bb01a13edb46bbedd0601541e53b8514afc6d7
Author: mrproliu <[email protected]>
AuthorDate: Sat Dec 28 23:14:12 2024 +0800
disable conntrack monitoring
---
pkg/accesslog/collector/connection.go | 136 ++++++++--------------------------
pkg/accesslog/common/connection.go | 7 --
pkg/tools/ip/conntrack.go | 114 +---------------------------
3 files changed, 36 insertions(+), 221 deletions(-)
diff --git a/pkg/accesslog/collector/connection.go
b/pkg/accesslog/collector/connection.go
index 7954e2d..d23c111 100644
--- a/pkg/accesslog/collector/connection.go
+++ b/pkg/accesslog/collector/connection.go
@@ -18,13 +18,11 @@
package collector
import (
- "container/list"
"context"
"encoding/binary"
"fmt"
"net"
"os"
- "sync"
"time"
"github.com/docker/go-units"
@@ -132,67 +130,19 @@ func (c *ConnectCollector) Stop() {
}
type ConnectionPartitionContext struct {
- context *common.AccessLogContext
- k8sOperator process.K8sOperator
- retryableEvents *list.List
- retryableMutex sync.Mutex
+ context *common.AccessLogContext
+ k8sOperator process.K8sOperator
}
func NewConnectionPartitionContext(ctx *common.AccessLogContext,
k8sOperator process.K8sOperator) *ConnectionPartitionContext {
return &ConnectionPartitionContext{
- context: ctx,
- k8sOperator: k8sOperator,
- retryableEvents: list.New(),
+ context: ctx,
+ k8sOperator: k8sOperator,
}
}
func (c *ConnectionPartitionContext) Start(ctx context.Context) {
- go func() {
- ticker := time.NewTicker(connectionAnalyzeRetryTime)
- for {
- select {
- case <-ctx.Done():
- return
- case <-ticker.C:
- c.analyzeFailureEvent()
- }
- }
- }()
-}
-
-func (c *ConnectionPartitionContext) analyzeFailureEvent() {
- for {
- event := c.PopEventToRetry()
- if event == nil {
- break
- }
- socketPair, err := c.BuildSocketFromConnectEvent(event)
- if err != nil {
- connectionLogger.Debugf("retry to analyze the connect
event failure, connection ID: %d, randomID: %d, error: %v",
- event.ConID, event.RandomID, err)
- continue
- }
- c.OnConnectionSocketFinished(event, socketPair)
- }
-}
-
-func (c *ConnectionPartitionContext) AddEventToRetry(e
*events.SocketConnectEvent) {
- c.retryableMutex.Lock()
- defer c.retryableMutex.Unlock()
- c.retryableEvents.PushBack(e)
-}
-
-func (c *ConnectionPartitionContext) PopEventToRetry()
*events.SocketConnectEvent {
- c.retryableMutex.Lock()
- defer c.retryableMutex.Unlock()
- if c.retryableEvents.Len() == 0 {
- return nil
- }
- element := c.retryableEvents.Front()
- event := element.Value.(*events.SocketConnectEvent)
- c.retryableEvents.Remove(element)
- return event
}
func (c *ConnectionPartitionContext) Consume(data interface{}) {
@@ -202,11 +152,7 @@ func (c *ConnectionPartitionContext) Consume(data
interface{}) {
"pid: %d, fd: %d, role: %s: func: %s, family: %d,
success: %d, conntrack exist: %t",
event.ConID, event.RandomID, event.PID, event.SocketFD,
enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
event.SocketFamily, event.ConnectSuccess,
event.ConnTrackUpstreamPort != 0)
- socketPair, err := c.BuildSocketFromConnectEvent(event)
- if err != nil {
- c.AddEventToRetry(event)
- return
- }
+ socketPair := c.BuildSocketFromConnectEvent(event)
c.OnConnectionSocketFinished(event, socketPair)
case *events.SocketCloseEvent:
connectionLogger.Debugf("receive close event, connection ID:
%d, randomID: %d, pid: %d, fd: %d",
@@ -248,40 +194,36 @@ func (c *ConnectionPartitionContext)
OnConnectionSocketFinished(event *events.So
forwarder.SendConnectEvent(c.context, event, socketPair)
}
-func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event
*events.SocketConnectEvent) (*ip.SocketPair, error) {
+func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event
*events.SocketConnectEvent) *ip.SocketPair {
if event.SocketFamily != unix.AF_INET && event.SocketFamily !=
unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
// if not ipv4, ipv6 or unknown, ignore
- return nil, nil
- }
- pair, err := c.BuildSocketPair(event)
- if err != nil {
- return nil, err
+ return nil
}
+ pair := c.BuildSocketPair(event)
if pair != nil && pair.IsValid() {
connectionLogger.Debugf("found the connection from the connect
event is valid, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
- return pair, nil
+ return pair
}
// if only the local port not success, maybe the upstream port is not
open, so it could be continued
if c.IsOnlyLocalPortEmpty(pair) {
event.ConnectSuccess = 0
connectionLogger.Debugf("the connection from the connect event
is only the local port is empty, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
- return pair, nil
+ return pair
}
- pair, err = ip.ParseSocket(event.PID, event.SocketFD)
+ pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
log.Debugf("cannot found the socket, pid: %d, socket FD: %d,
error: %v", event.PID, event.SocketFD, err)
- // because if the socket is not found, the connection is not
valid, should not return the error for retry
- return nil, nil
+ return nil
}
connectionLogger.Debugf("found the connection from the socket,
connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
pair.Role = enums.ConnectionRole(event.Role)
c.FixSocketFamilyIfNeed(event, pair)
- c.TryToUpdateSocketFromConntrack(event, pair)
- return pair, nil
+ c.CheckNeedConntrack(event, pair)
+ return pair
}
func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair
*ip.SocketPair) bool {
@@ -296,7 +238,7 @@ func (c *ConnectionPartitionContext)
IsOnlyLocalPortEmpty(socketPair *ip.SocketP
return socketPair.IsValid()
}
-func (c *ConnectionPartitionContext) BuildSocketPair(event
*events.SocketConnectEvent) (*ip.SocketPair, error) {
+func (c *ConnectionPartitionContext) BuildSocketPair(event
*events.SocketConnectEvent) *ip.SocketPair {
var result *ip.SocketPair
haveConnTrack := false
if event.SocketFamily == unix.AF_INET {
@@ -348,46 +290,32 @@ func (c *ConnectionPartitionContext)
BuildSocketPair(event *events.SocketConnect
}
if haveConnTrack {
- return result, nil
+ return result
}
c.FixSocketFamilyIfNeed(event, result)
- // support retry to update the socket from conntrack
- err := c.TryToUpdateSocketFromConntrack(event, result)
- return result, err
+ c.CheckNeedConntrack(event, result)
+ return result
}
-func (c *ConnectionPartitionContext) TryToUpdateSocketFromConntrack(event
*events.SocketConnectEvent, socket *ip.SocketPair) error {
+func (c *ConnectionPartitionContext) CheckNeedConntrack(event
*events.SocketConnectEvent, socket *ip.SocketPair) error {
if socket == nil || !socket.IsValid() ||
tools.IsLocalHostAddress(socket.DestIP) ||
- event.FuncName == enums.SocketFunctionNameAccept { // accept
event don't need to update the remote address
+ event.FuncName == enums.SocketFunctionNameAccept || // accept
event don't need to update the remote address
+ !c.context.ConnectionMgr.ProcessIsDetectBy(event.PID,
api.Kubernetes) { // only the k8s process need to update the remote address
from conntrack
return nil
}
- if c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes)
{
- isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
- if err != nil {
- connectionLogger.Warnf("cannot found the pod IP,
connection ID: %d, randomID: %d, error: %v",
- event.ConID, event.RandomID, err)
- }
- if isPodIP {
- connectionLogger.Debugf("detect the remote IP is pod
IP, connection ID: %d, randomID: %d, remote: %s",
- event.ConID, event.RandomID, socket.DestIP)
- return nil
- }
+
+ isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
+ if err != nil {
+ connectionLogger.Warnf("cannot found the pod IP, connection ID:
%d, randomID: %d, error: %v",
+ event.ConID, event.RandomID, err)
}
- connectionLogger.Debugf("try to update the remote address from
conntrack, connection ID: %d, randomID: %d, func: %s, local: %s:%d, remote:
%s:%d",
- event.ConID, event.RandomID,
enums.SocketFunctionName(event.FuncName), socket.SrcIP, socket.SrcPort,
socket.DestIP, socket.DestPort)
- if c.context.ConnectionMgr.ConnectTracker != nil {
- // if no contract and socket data is valid, then trying to get
the remote address from the socket
- // to encase the remote address is not the real remote address
- originalIP := socket.DestIP
- originalPort := socket.DestPort
- err :=
c.context.ConnectionMgr.ConnectTracker.UpdateRealPeerAddress(socket, true)
- if err != nil {
- return fmt.Errorf("update the socket address from
conntrack failure, error: %v", err)
- }
- connectionLogger.Debugf("update the socket address from
conntrack success, "+
- "connection ID: %d, randomID: %d, original remote:
%s:%d, new remote: %s:%d",
- event.ConID, event.RandomID, originalIP, originalPort,
socket.DestIP, socket.DestPort)
+ if isPodIP {
+ connectionLogger.Debugf("detect the remote IP is pod IP,
connection ID: %d, randomID: %d, remote: %s",
+ event.ConID, event.RandomID, socket.DestIP)
+ return nil
}
+ // update to the socket need to update the remote address from conntrack
+ socket.NeedConnTrack = true
return nil
}
diff --git a/pkg/accesslog/common/connection.go
b/pkg/accesslog/common/connection.go
index ca185ff..63ad32a 100644
--- a/pkg/accesslog/common/connection.go
+++ b/pkg/accesslog/common/connection.go
@@ -182,13 +182,6 @@ func NewConnectionManager(config *Config, moduleMgr
*module.Manager, bpfLoader *
func (c *ConnectionManager) Start(ctx context.Context, accessLogContext
*AccessLogContext) {
c.processOP.AddListener(c)
- if c.ConnectTracker != nil {
- err := c.ConnectTracker.StartMonitoring(ctx)
- if err != nil {
- log.Warnf("cannot start the connection tracker
monitoring, %v", err)
- }
- }
-
// starting to clean up the un-active connection in BPF
go func() {
ticker := time.NewTicker(cleanActiveConnectionInterval)
diff --git a/pkg/tools/ip/conntrack.go b/pkg/tools/ip/conntrack.go
index fe18d79..16f4cdc 100644
--- a/pkg/tools/ip/conntrack.go
+++ b/pkg/tools/ip/conntrack.go
@@ -18,17 +18,13 @@
package ip
import (
- "context"
"fmt"
"github.com/florianl/go-conntrack"
- "github.com/mdlayher/netlink"
"golang.org/x/sys/unix"
"net"
"syscall"
"time"
- "k8s.io/apimachinery/pkg/util/cache"
-
"github.com/apache/skywalking-rover/pkg/logger"
)
@@ -48,12 +44,7 @@ var numberStrategies = []struct {
}}
type ConnTrack struct {
- queryClient *conntrack.Nfct
- monitorClient *conntrack.Nfct
- monitorExpire *cache.Expiring
-
- ctx context.Context
- cancel context.CancelFunc
+ nfct *conntrack.Nfct
}
func NewConnTrack() (*ConnTrack, error) {
@@ -63,108 +54,11 @@ func NewConnTrack() (*ConnTrack, error) {
}
return &ConnTrack{
- queryClient: nfct,
- monitorExpire: cache.NewExpiring(),
+ nfct: nfct,
}, nil
}
-func (c *ConnTrack) StartMonitoring(ctx context.Context) error {
- c.ctx, c.cancel = context.WithCancel(ctx)
- errors := make(chan error, 2)
- errChain, err := c.monitor0(c.ctx)
- if err != nil {
- return err
- }
-
- go func() {
- e := <-errChain
- errors <- e
-
- for {
- select {
- case <-ctx.Done():
- return
- case monitoringError := <-errors:
- log.Warnf("monitoring conntrack failure, will
re-try monitoring after 5 second. error: %v", monitoringError)
- time.Sleep(time.Second * 5)
-
- errChain, monitoringError = c.monitor0(ctx)
- if monitoringError != nil {
- errors <- monitoringError
- continue
- }
- mError := <-errChain
- errors <- mError
- }
- }
- }()
- return nil
-}
-
-func (c *ConnTrack) monitor0(ctx context.Context) (<-chan error, error) {
- if c.monitorClient != nil {
- if e := c.monitorClient.Close(); e != nil {
- log.Warnf("close the conntack monitor client error:
%v", e)
- }
- }
-
- nfct, err := conntrack.Open(&conntrack.Config{})
- if err != nil {
- return nil, err
- }
- c.monitorClient = nfct
- c.monitorClient.Con.SetReadBuffer(26214400)
- c.monitorClient.Con.SetOption(netlink.ListenAllNSID, true)
- errChan := c.monitorClient.AttachErrChan()
- err = c.monitorClient.RegisterFiltered(ctx, conntrack.Conntrack,
conntrack.NetlinkCtNew,
- []conntrack.ConnAttr{
- {Type: conntrack.AttrOrigPortDst, Data: []byte{0x0,
0x35}, Mask: []byte{0xff, 0xff}, Negate: true}, // DstPort != 53
- }, func(con conntrack.Con) int {
- if con.Origin == nil || con.Reply == nil ||
con.Origin.Proto == nil || con.Reply.Proto == nil ||
- con.Origin.Src == nil || con.Origin.Dst == nil
|| con.Origin.Proto.SrcPort == nil || con.Origin.Proto.DstPort == nil ||
- con.Reply.Src == nil || con.Reply.Dst == nil ||
con.Reply.Proto.SrcPort == nil || con.Reply.Proto.DstPort == nil {
- return 0
- }
-
- c.monitorExpire.Set(conntrackExpireKey{
- sourceIP: con.Origin.Src.String(),
- destIP: con.Origin.Dst.String(),
- sourcePort: *con.Origin.Proto.SrcPort,
- destPort: *con.Origin.Proto.DstPort,
- }, conntrackExpireValue{
- realIP: con.Reply.Src.String(),
- realPort: *con.Reply.Proto.SrcPort,
- }, monitorExpireTime)
- return 0
- })
- if err != nil {
- return nil, err
- }
- return errChan, nil
-}
-
-func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair, fromCacheOnly
bool) error {
- key := conntrackExpireKey{
- sourceIP: addr.SrcIP,
- destIP: addr.DestIP,
- sourcePort: addr.SrcPort,
- destPort: addr.DestPort,
- }
- val, exist := c.monitorExpire.Get(key)
- if exist {
- v := val.(conntrackExpireValue)
- addr.DestIP = v.realIP
- addr.DestPort = v.realPort
- addr.NeedConnTrack = false
- log.Debugf("update real peer address from cache: %s:%d",
addr.DestIP, addr.DestPort)
- c.monitorExpire.Delete(key)
- return nil
- }
- addr.NeedConnTrack = true
- if fromCacheOnly {
- return nil
- }
-
+func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair) error {
family := conntrack.IPv4
if addr.Family == unix.AF_INET6 {
family = conntrack.IPv6
@@ -175,7 +69,7 @@ func (c *ConnTrack) UpdateRealPeerAddress(addr *SocketPair,
fromCacheOnly bool)
tuple.Proto.Number = &(info.proto)
// using get to query protocol
- session, e := c.queryClient.Get(conntrack.Conntrack, family,
conntrack.Con{Origin: tuple})
+ session, e := c.nfct.Get(conntrack.Conntrack, family,
conntrack.Con{Origin: tuple})
if e != nil {
// try to get the reply session, if the info not exists
or from accept events, have error is normal
return fmt.Errorf("cannot get the conntrack session,
type: %s, family: %d, origin src: %s:%d, origin dest: %s:%d, error: %v",
info.name,